You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/21 19:11:01 UTC

[incubator-pinot] branch remove_consuming_partition_info updated (200bef6 -> f83cbb5)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch remove_consuming_partition_info
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 200bef6  Remove the partition info from the consuming segment ZK metadata
 discard fcf2b7a  Handle the partitioning mismatch between table config and stream
     add d9aec17  Improve the realtime time creation unit test (#6032)
     add 5548e79  Table indexing config validation (#6017)
     add 8511410  Publish helm package pinot 0.2.1 (#6034)
     add 0dbe06d  Publish helm repo with new index (#6035)
     add fe047fd  Support streaming query in QueryExecutor (#6027)
     add 919f407  Handle the partitioning mismatch between table config and stream (#6031)
     add 73f0459  Add Broker Reduce Time Log (#6033)
     new f83cbb5  Remove the partition info from the consuming segment ZK metadata

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (200bef6)
            \
             N -- N -- N   refs/heads/remove_consuming_partition_info (f83cbb5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/README.md => kubernetes/helm/README-dev.md    |  25 +-
 kubernetes/helm/index.yaml                         |  34 ++-
 kubernetes/helm/pinot-0.2.1.tgz                    | Bin 0 -> 23883 bytes
 kubernetes/helm/pinot/Chart.yaml                   |   4 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |   4 +-
 .../segmentpruner/PartitionSegmentPruner.java      |  32 +-
 .../helix/core/PinotHelixResourceManager.java      |   1 -
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   6 +-
 .../api/PinotTableRestletResourceTest.java         |  13 +-
 .../core/query/executor/GrpcQueryExecutor.java     | 327 ---------------------
 .../pinot/core/query/executor/QueryExecutor.java   |  26 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  |  29 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  76 ++++-
 .../apache/pinot/core/util/TableConfigUtils.java   | 114 ++++++-
 .../query/scheduler/PrioritySchedulerTest.java     |  36 ++-
 .../pinot/core/util/TableConfigUtilsTest.java      | 154 ++++++++++
 ...PartitionLLCRealtimeClusterIntegrationTest.java | 170 ++++++++++-
 ...ulls_default_column_test_missing_columns.schema |   4 +-
 .../pinot/server/starter/ServerInstance.java       |   5 +-
 .../spi/utils/builder/TableConfigBuilder.java      |  15 +
 20 files changed, 668 insertions(+), 407 deletions(-)
 copy docs/README.md => kubernetes/helm/README-dev.md (67%)
 create mode 100644 kubernetes/helm/pinot-0.2.1.tgz
 delete mode 100644 pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Remove the partition info from the consuming segment ZK metadata

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch remove_consuming_partition_info
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f83cbb569448d883250ae6d57d1070e80f5b3657
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Mon Sep 21 12:10:37 2020 -0700

    Remove the partition info from the consuming segment ZK metadata
---
 .../segmentpruner/PartitionSegmentPruner.java      |  32 +++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  28 +--
 ...PartitionLLCRealtimeClusterIntegrationTest.java | 199 +++++++++++++++++----
 3 files changed, 192 insertions(+), 67 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
index 8320b30..181bad9 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
@@ -32,7 +32,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.data.partition.PartitionFunction;
@@ -76,17 +76,32 @@ public class PartitionSegmentPruner implements SegmentPruner {
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT);
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      _partitionInfoMap.put(segment, extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)));
+      PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
+      if (partitionInfo != null) {
+        _partitionInfoMap.put(segment, partitionInfo);
+      }
     }
   }
 
+  /**
+   * NOTE: Returns {@code null} when the ZNRecord is missing (could be transient Helix issue), or the segment is a
+   *       consuming segment so that we can retry later. Returns {@link #INVALID_PARTITION_INFO} when the segment does
+   *       not have valid partition metadata in its ZK metadata, in which case we won't retry later.
+   */
+  @Nullable
   private PartitionInfo extractPartitionInfoFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) {
     if (znRecord == null) {
       LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _tableNameWithType);
-      return INVALID_PARTITION_INFO;
+      return null;
     }
 
-    String partitionMetadataJson = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_METADATA);
+    // Skip processing the partition metadata for the consuming segment because the partition metadata is updated when
+    // the consuming segment is committed
+    if (Segment.Realtime.Status.IN_PROGRESS.name().equals(znRecord.getSimpleField(Segment.Realtime.STATUS))) {
+      return null;
+    }
+
+    String partitionMetadataJson = znRecord.getSimpleField(Segment.PARTITION_METADATA);
     if (partitionMetadataJson == null) {
       LOGGER.warn("Failed to find segment partition metadata for segment: {}, table: {}", segment, _tableNameWithType);
       return INVALID_PARTITION_INFO;
@@ -127,8 +142,13 @@ public class PartitionSegmentPruner implements SegmentPruner {
 
   @Override
   public synchronized void refreshSegment(String segment) {
-    _partitionInfoMap.put(segment, extractPartitionInfoFromSegmentZKMetadataZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
+    PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment,
+        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT));
+    if (partitionInfo != null) {
+      _partitionInfoMap.put(segment, partitionInfo);
+    } else {
+      _partitionInfoMap.remove(segment);
+    }
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 43ea74c..b85bdb6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -69,8 +68,6 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -552,12 +549,8 @@ public class PinotLLCRealtimeSegmentManager {
     newSegmentZKMetadata.setNumReplicas(numReplicas);
     newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
 
-    // Add the partition metadata if available
-    SegmentPartitionMetadata partitionMetadata =
-        getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionId());
-    if (partitionMetadata != null) {
-      newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
-    }
+    // NOTE: DO NOT add the partition metadata for the consuming segment to prevent mis-pruning the segment when the
+    //       stream is not partitioned properly.
 
     // Update the flush threshold
     FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
@@ -568,23 +561,6 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @Nullable
-  private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId) {
-    SegmentPartitionConfig partitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-    if (partitionConfig == null) {
-      return null;
-    }
-    Map<String, ColumnPartitionMetadata> partitionMetadataMap = new TreeMap<>();
-    for (Map.Entry<String, ColumnPartitionConfig> entry : partitionConfig.getColumnPartitionMap().entrySet()) {
-      String columnName = entry.getKey();
-      ColumnPartitionConfig columnPartitionConfig = entry.getValue();
-      partitionMetadataMap.put(columnName,
-          new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), columnPartitionConfig.getNumPartitions(),
-              Collections.singleton(partitionId)));
-    }
-    return new SegmentPartitionMetadata(partitionMetadataMap);
-  }
-
-  @Nullable
   private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
     Map<String, ColumnPartitionMetadata> partitionMetadataMap = new HashMap<>();
     for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index c520599..b049dbb 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,6 +31,7 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -45,6 +47,7 @@ import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -52,8 +55,14 @@ import static org.testng.Assert.assertTrue;
  * Integration test that enables segment partition for the LLC real-time table.
  */
 public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest {
-  // Number of documents in the first Avro file
-  private static final long NUM_DOCS = 9292;
+  private static final String PARTITION_COLUMN = "Carrier";
+  // Number of documents in the first and second Avro file
+  private static final long NUM_DOCS_IN_FIRST_AVRO_FILE = 9292;
+  private static final long NUM_DOCS_IN_SECOND_AVRO_FILE = 8736;
+
+  private List<File> _avroFiles;
+  private String _partitionColumn;
+  private long _countStarResult;
 
   @BeforeClass
   public void setUp()
@@ -70,32 +79,34 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
     startKafka();
 
     // Unpack the Avro files
-    List<File> avroFiles = unpackAvroData(_tempDir);
+    _avroFiles = unpackAvroData(_tempDir);
 
     // Create and upload the schema and table config with reduced number of columns and partition config
-    Schema schema =
-        new Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier", DataType.STRING)
-            .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(getSchemaName())
+        .addSingleValueDimension(PARTITION_COLUMN, DataType.STRING)
+        .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build();
     addSchema(schema);
 
-    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
     indexingConfig.setSegmentPartitionConfig(
-        new SegmentPartitionConfig(Collections.singletonMap("Carrier", new ColumnPartitionConfig("murmur", 5))));
+        new SegmentPartitionConfig(Collections.singletonMap(PARTITION_COLUMN, new ColumnPartitionConfig("murmur", 5))));
     tableConfig.setRoutingConfig(
         new RoutingConfig(null, Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null));
     addTableConfig(tableConfig);
 
     // Push data into Kafka (only ingest the first Avro file)
-    pushAvroIntoKafka(Collections.singletonList(avroFiles.get(0)));
+    _partitionColumn = PARTITION_COLUMN;
+    pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(0)));
 
     // Wait for all documents loaded
+    _countStarResult = NUM_DOCS_IN_FIRST_AVRO_FILE;
     waitForAllDocsLoaded(600_000L);
   }
 
   @Override
   protected long getCountStarResult() {
-    return NUM_DOCS;
+    return _countStarResult;
   }
 
   @Override
@@ -105,6 +116,12 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
 
   @Nullable
   @Override
+  protected String getPartitionColumn() {
+    return _partitionColumn;
+  }
+
+  @Nullable
+  @Override
   protected List<String> getInvertedIndexColumns() {
     return null;
   }
@@ -129,47 +146,159 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
 
   @Test
   public void testPartitionMetadata() {
+    int[] numCompletedSegmentsForPartition = new int[2];
     List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
         _helixResourceManager.getRealtimeSegmentMetadata(getTableName());
     for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
-      SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
-      assertNotNull(segmentPartitionMetadata);
-      Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
-          segmentPartitionMetadata.getColumnPartitionMap();
-      assertEquals(columnPartitionMetadataMap.size(), 1);
-      ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier");
-      assertNotNull(columnPartitionMetadata);
+      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
+        // For consuming segment, there should be no partition metadata
+        assertNull(segmentZKMetadata.getPartitionMetadata());
+      } else {
+        // Completed segment
 
-      // The function name should be aligned with the partition config in the table config
-      assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+        SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
+        assertNotNull(segmentPartitionMetadata);
+        Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+            segmentPartitionMetadata.getColumnPartitionMap();
+        assertEquals(columnPartitionMetadataMap.size(), 1);
+        ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get(PARTITION_COLUMN);
+        assertNotNull(columnPartitionMetadata);
 
-      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
-        // Consuming segment
+        // The function name should be aligned with the partition config in the table config
+        assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
+        // Number of partitions should be the same as number of stream partitions
+        assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
+
+        // Should contain only one partition, which is the same as the stream partition
+        int streamPartition = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+        assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(streamPartition));
+
+        numCompletedSegmentsForPartition[streamPartition]++;
+      }
+    }
+
+    // There should be 0 completed segments for partition 0, 2 completed segments for partition 1
+    assertEquals(numCompletedSegmentsForPartition[0], 0);
+    assertEquals(numCompletedSegmentsForPartition[1], 2);
+  }
+
+  @Test(dependsOnMethods = "testPartitionMetadata")
+  public void testPartitionRouting()
+      throws Exception {
+    // Query partition 0
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'UA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'UA' AND 'UA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
+
+      // Should only query the consuming segment for both partition 0 and partition 1
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 2);
+      assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+
+      assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+
+    // Query partition 1
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'AA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'AA' AND 'AA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
 
-        // Number of partitions should be aligned with the partition config in the table config
-        assertEquals(columnPartitionMetadata.getNumPartitions(), 5);
+      // Should query all the segments
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+      assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
 
-        // Should contain only the stream partition
-        assertEquals(columnPartitionMetadata.getPartitions(),
-            Collections.singleton(new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId()));
+      assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+  }
+
+  @Test(dependsOnMethods = "testPartitionRouting")
+  public void testNonPartitionedStream()
+      throws Exception {
+    // Push the second Avro file into Kafka without partitioning
+    _partitionColumn = null;
+    pushAvroIntoKafka(Collections.singletonList(_avroFiles.get(1)));
+
+    // Wait for all documents loaded
+    _countStarResult = NUM_DOCS_IN_FIRST_AVRO_FILE + NUM_DOCS_IN_SECOND_AVRO_FILE;
+    waitForAllDocsLoaded(600_000L);
+
+    // Check partition metadata
+    List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
+        _helixResourceManager.getRealtimeSegmentMetadata(getTableName());
+    for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
+        // For consuming segment, there should be no partition metadata
+        assertNull(segmentZKMetadata.getPartitionMetadata());
       } else {
         // Completed segment
 
+        SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
+        assertNotNull(segmentPartitionMetadata);
+        Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+            segmentPartitionMetadata.getColumnPartitionMap();
+        assertEquals(columnPartitionMetadataMap.size(), 1);
+        ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get(PARTITION_COLUMN);
+        assertNotNull(columnPartitionMetadata);
+
+        // The function name should be aligned with the partition config in the table config
+        assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
         // Number of partitions should be the same as number of stream partitions
         assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
 
-        // Should contain the partitions based on the ingested records. Since the records are not partitioned in Kafka,
-        // it should contain all the partitions.
-        assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1)));
+        // The partition metadata for the new completed segments should contain both partitions
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
+        int streamPartition = llcSegmentName.getPartitionId();
+        int sequenceNumber = llcSegmentName.getSequenceNumber();
+        if (streamPartition == 0 || (streamPartition == 1 && sequenceNumber >= 2)) {
+          assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1)));
+        }
       }
     }
-  }
 
-  // TODO: Add test on partition routing once the consuming segment behavior is fixed.
-  //       Currently the partition info is cached in the PartitionSegmentPruner, and won't be reloaded when the
-  //       consuming segment gets committed. The segment will be pruned based on the consuming segment partition info
-  //       (using stream partition as the segment partition), even if the partition info changed for the completed
-  //       segment.
+    // Check partition routing
+    int numSegments = segmentZKMetadataList.size();
+
+    // Query partition 0
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'UA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'UA' AND 'UA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
+
+      // Should skip the first 2 completed segments for partition 1
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments - 2);
+      assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments);
+
+      assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+
+    // Query partition 1
+    {
+      String query = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'AA'";
+      JsonNode response = postQuery(query);
+
+      String queryToCompare = "SELECT COUNT(*) FROM mytable WHERE Carrier BETWEEN 'AA' AND 'AA'";
+      JsonNode responseToCompare = postQuery(queryToCompare);
+
+      // Should query all the segments
+      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments);
+      assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), numSegments);
+
+      assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
+          responseToCompare.get("aggregationResults").get(0).get("value").asInt());
+    }
+  }
 
   @AfterClass
   public void tearDown()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org