You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/17 23:25:54 UTC

[incubator-pinot] 01/01: Remove the partition info from the consuming segment ZK metadata

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch remove_consuming_partition_info
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 200bef6ecd05c177b829b0b6ab1995916c13a7c6
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Sep 17 16:12:24 2020 -0700

    Remove the partition info from the consuming segment ZK metadata
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 41 ++--------------------
 ...PartitionLLCRealtimeClusterIntegrationTest.java | 39 ++++++++++----------
 2 files changed, 22 insertions(+), 58 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d93072a..5d47b1a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -69,8 +68,6 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -550,12 +547,8 @@ public class PinotLLCRealtimeSegmentManager {
     newSegmentZKMetadata.setNumReplicas(numReplicas);
     newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
 
-    // Add the partition metadata if available
-    SegmentPartitionMetadata partitionMetadata =
-        getPartitionMetadataFromTableConfig(tableConfig, numPartitions, newLLCSegmentName.getPartitionId());
-    if (partitionMetadata != null) {
-      newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
-    }
+    // NOTE: DO NOT add the partition metadata for the consuming segment to prevent mis-pruning the segment when the
+    //       stream is not partitioned properly
 
     // Update the flush threshold
     FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
@@ -566,36 +559,6 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @Nullable
-  private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int numPartitions,
-      int partitionId) {
-    SegmentPartitionConfig partitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-    if (partitionConfig == null) {
-      return null;
-    }
-    Map<String, ColumnPartitionMetadata> partitionMetadataMap = new TreeMap<>();
-    for (Map.Entry<String, ColumnPartitionConfig> entry : partitionConfig.getColumnPartitionMap().entrySet()) {
-      String columnName = entry.getKey();
-      ColumnPartitionConfig columnPartitionConfig = entry.getValue();
-
-      // NOTE: Here we compare the number of partitions from the config and the stream, and log a warning when they
-      //       don't match, but use the one from the stream. The mismatch could happen when the stream partitions are
-      //       changed, but the table config has not been updated to reflect the change. In such case, picking the
-      //       number of partitions from the stream can keep the segment properly partitioned as long as the partition
-      //       function is not changed.
-      if (columnPartitionConfig.getNumPartitions() != numPartitions) {
-        LOGGER.warn(
-            "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
-            numPartitions, columnPartitionConfig.getNumPartitions());
-      }
-
-      partitionMetadataMap.put(columnName,
-          new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), numPartitions,
-              Collections.singleton(partitionId)));
-    }
-    return new SegmentPartitionMetadata(partitionMetadataMap);
-  }
-
-  @Nullable
   private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
     Map<String, ColumnPartitionMetadata> partitionMetadataMap = new HashMap<>();
     for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index b98dd7d..0866131 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
-import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -44,6 +43,7 @@ import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -129,27 +129,28 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
     List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
         _helixResourceManager.getRealtimeSegmentMetadata(getTableName());
     for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
-      SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
-      assertNotNull(segmentPartitionMetadata);
-      Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
-          segmentPartitionMetadata.getColumnPartitionMap();
-      assertEquals(columnPartitionMetadataMap.size(), 1);
-      ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier");
-      assertNotNull(columnPartitionMetadata);
-
-      // The function name should be aligned with the partition config in the table config
-      assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
-
-      // Number of partitions should be the same as number of stream partitions
-      assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
 
       if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
-        // For consuming segment, should contain only the stream partition
-        assertEquals(columnPartitionMetadata.getPartitions(),
-            Collections.singleton(new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId()));
+        // For consuming segment, there should be no partition metadata
+        assertNull(segmentZKMetadata.getPartitionMetadata());
       } else {
-        // For completed segment, should contain the partitions based on the ingested records. Since the records are not
-        // partitioned in Kafka, it should contain all partitions.
+        // Completed segment
+
+        SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
+        assertNotNull(segmentPartitionMetadata);
+        Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+            segmentPartitionMetadata.getColumnPartitionMap();
+        assertEquals(columnPartitionMetadataMap.size(), 1);
+        ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier");
+        assertNotNull(columnPartitionMetadata);
+
+        // The function name should be aligned with the partition config in the table config
+        assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
+        // Number of partitions should be the same as number of stream partitions
+        assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
+
+        // Should contain all partitions as the records are not partitioned in Kafka
         assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1)));
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org