You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/17 23:25:54 UTC
[incubator-pinot] 01/01: Remove the partition info from the
consuming segment ZK metadata
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch remove_consuming_partition_info
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 200bef6ecd05c177b829b0b6ab1995916c13a7c6
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Sep 17 16:12:24 2020 -0700
Remove the partition info from the consuming segment ZK metadata
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 41 ++--------------------
...PartitionLLCRealtimeClusterIntegrationTest.java | 39 ++++++++++----------
2 files changed, 22 insertions(+), 58 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d93072a..5d47b1a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -69,8 +68,6 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
@@ -550,12 +547,8 @@ public class PinotLLCRealtimeSegmentManager {
newSegmentZKMetadata.setNumReplicas(numReplicas);
newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
- // Add the partition metadata if available
- SegmentPartitionMetadata partitionMetadata =
- getPartitionMetadataFromTableConfig(tableConfig, numPartitions, newLLCSegmentName.getPartitionId());
- if (partitionMetadata != null) {
- newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
- }
+ // NOTE: DO NOT add the partition metadata for the consuming segment to prevent mis-pruning the segment when the
+ // stream is not partitioned properly
// Update the flush threshold
FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
@@ -566,36 +559,6 @@ public class PinotLLCRealtimeSegmentManager {
}
@Nullable
- private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int numPartitions,
- int partitionId) {
- SegmentPartitionConfig partitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
- if (partitionConfig == null) {
- return null;
- }
- Map<String, ColumnPartitionMetadata> partitionMetadataMap = new TreeMap<>();
- for (Map.Entry<String, ColumnPartitionConfig> entry : partitionConfig.getColumnPartitionMap().entrySet()) {
- String columnName = entry.getKey();
- ColumnPartitionConfig columnPartitionConfig = entry.getValue();
-
- // NOTE: Here we compare the number of partitions from the config and the stream, and log a warning when they
- // don't match, but use the one from the stream. The mismatch could happen when the stream partitions are
- // changed, but the table config has not been updated to reflect the change. In such case, picking the
- // number of partitions from the stream can keep the segment properly partitioned as long as the partition
- // function is not changed.
- if (columnPartitionConfig.getNumPartitions() != numPartitions) {
- LOGGER.warn(
- "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
- numPartitions, columnPartitionConfig.getNumPartitions());
- }
-
- partitionMetadataMap.put(columnName,
- new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), numPartitions,
- Collections.singleton(partitionId)));
- }
- return new SegmentPartitionMetadata(partitionMetadataMap);
- }
-
- @Nullable
private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
Map<String, ColumnPartitionMetadata> partitionMetadataMap = new HashMap<>();
for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index b98dd7d..0866131 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
-import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -44,6 +43,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -129,27 +129,28 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
_helixResourceManager.getRealtimeSegmentMetadata(getTableName());
for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
- SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
- assertNotNull(segmentPartitionMetadata);
- Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
- segmentPartitionMetadata.getColumnPartitionMap();
- assertEquals(columnPartitionMetadataMap.size(), 1);
- ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier");
- assertNotNull(columnPartitionMetadata);
-
- // The function name should be aligned with the partition config in the table config
- assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
-
- // Number of partitions should be the same as number of stream partitions
- assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
- // For consuming segment, should contain only the stream partition
- assertEquals(columnPartitionMetadata.getPartitions(),
- Collections.singleton(new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId()));
+ // For consuming segment, there should be no partition metadata
+ assertNull(segmentZKMetadata.getPartitionMetadata());
} else {
- // For completed segment, should contain the partitions based on the ingested records. Since the records are not
- // partitioned in Kafka, it should contain all partitions.
+ // Completed segment
+
+ SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
+ assertNotNull(segmentPartitionMetadata);
+ Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+ segmentPartitionMetadata.getColumnPartitionMap();
+ assertEquals(columnPartitionMetadataMap.size(), 1);
+ ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier");
+ assertNotNull(columnPartitionMetadata);
+
+ // The function name should be aligned with the partition config in the table config
+ assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
+ // Number of partitions should be the same as number of stream partitions
+ assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
+
+ // Should contain all partitions as the records are not partitioned in Kafka
assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1)));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org