You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/09/24 07:41:25 UTC

[GitHub] [pinot] mqliang commented on a change in pull request #7397: [7264] Add Exception to Broker Response When Not All Segments Are Available (Partial Response)

mqliang commented on a change in pull request #7397:
URL: https://github.com/apache/pinot/pull/7397#discussion_r715382060



##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -487,6 +491,11 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json
       _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED, 1);
     }
 
+    if (0 != numUnavailableSegments) {

Review comment:
       variable `numUnavailableSegments` seems redundant, use 
   * `requestStatistics.setNumUnavailableSegments(unavailableSegments.size())` at L445
   * `if (! unavailableSegments.isEmpty())` at L494
   Same for other places

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
##########
@@ -0,0 +1,153 @@
+package org.apache.pinot.core.query.executor;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.PinotMetricUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class QueryExecutorExceptionsTest {
+  private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
+  private static final String EMPTY_JSON_DATA_PATH = "data/test_empty_data.json";
+  private static final String QUERY_EXECUTOR_CONFIG_PATH = "conf/query-executor.properties";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
+  private static final String TABLE_NAME = "testTable";
+  private static final int NUM_SEGMENTS_TO_GENERATE = 2;
+  private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
+  private static final Pql2Compiler COMPILER = new Pql2Compiler();

Review comment:
       We have deprecated PQL. All PQL related code will be cleaned up in the future. Please try to use SqlComplier 

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
##########
@@ -0,0 +1,153 @@
+package org.apache.pinot.core.query.executor;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.PinotMetricUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class QueryExecutorExceptionsTest {
+  private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
+  private static final String EMPTY_JSON_DATA_PATH = "data/test_empty_data.json";
+  private static final String QUERY_EXECUTOR_CONFIG_PATH = "conf/query-executor.properties";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
+  private static final String TABLE_NAME = "testTable";
+  private static final int NUM_SEGMENTS_TO_GENERATE = 2;
+  private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
+  private static final Pql2Compiler COMPILER = new Pql2Compiler();
+  private static final ExecutorService QUERY_RUNNERS = Executors.newFixedThreadPool(20);
+
+  private final List<ImmutableSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
+  private final List<String> _segmentNames = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
+
+  private ServerMetrics _serverMetrics;
+  private QueryExecutor _queryExecutor;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Set up the segments
+    FileUtils.deleteQuietly(INDEX_DIR);
+    Assert.assertTrue(INDEX_DIR.mkdirs());
+    URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
+    Assert.assertNotNull(resourceUrl);
+    File avroFile = new File(resourceUrl.getFile());
+    Schema schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(avroFile);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    int i = 0;
+    for (; i < NUM_SEGMENTS_TO_GENERATE; i++) {
+      SegmentGeneratorConfig config = SegmentTestUtils
+          .getSegmentGeneratorConfig(avroFile, FileFormat.AVRO, INDEX_DIR, TABLE_NAME, tableConfig, schema);
+      config.setSegmentNamePostfix(Integer.toString(i));
+      SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+      driver.init(config);
+      driver.build();
+      IngestionSchemaValidator ingestionSchemaValidator = driver.getIngestionSchemaValidator();
+      Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
+      Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+      Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
+      Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+      _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
+      _segmentNames.add(driver.getSegmentName());
+    }
+    resourceUrl = getClass().getClassLoader().getResource(EMPTY_JSON_DATA_PATH);
+    Assert.assertNotNull(resourceUrl);
+    File jsonFile = new File(resourceUrl.getFile());
+    for (; i < NUM_SEGMENTS_TO_GENERATE + NUM_EMPTY_SEGMENTS_TO_GENERATE; i++) {
+      SegmentGeneratorConfig config = SegmentTestUtils
+          .getSegmentGeneratorConfig(jsonFile, FileFormat.JSON, INDEX_DIR, TABLE_NAME, tableConfig, schema);
+      config.setSegmentNamePostfix(Integer.toString(i));
+      SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+      driver.init(config);
+      driver.build();
+      _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
+      _segmentNames.add(driver.getSegmentName());
+    }
+
+    // Mock the instance data manager
+    _serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class);
+    when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
+    when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
+    when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    @SuppressWarnings("unchecked")
+    TableDataManager tableDataManager = TableDataManagerProvider
+        .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
+            mock(ServerMetrics.class), mock(HelixManager.class), null);
+    tableDataManager.start();
+    //we don't add index segments to the data manager to simulate numSegmentsAcquired < numSegmentsQueried
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
+
+    // Set up the query executor
+    resourceUrl = getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
+    Assert.assertNotNull(resourceUrl);
+    PropertiesConfiguration queryExecutorConfig = new PropertiesConfiguration();
+    queryExecutorConfig.setDelimiterParsingDisabled(false);
+    queryExecutorConfig.load(new File(resourceUrl.getFile()));
+    _queryExecutor = new ServerQueryExecutorV1Impl();
+    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), instanceDataManager, _serverMetrics);
+  }
+
+  @Test
+  public void testServerSegmentMissingErrorCode() {
+    String query = "SELECT COUNT(*) FROM " + TABLE_NAME;
+    InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query));
+    instanceRequest.setSearchSegments(_segmentNames);
+    DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    Map<Integer, String> exceptions = instanceResponse.getExceptions();
+    Assert.assertTrue(exceptions.containsKey(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE));

Review comment:
       better to check the missing segment name list also.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org