You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:21 UTC

[incubator-pinot] 04/08: Controller side code

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b13946066ce3e30375f18ddabd4023c113219eb6
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 12:04:54 2020 -0800

    Controller side code
---
 .../segment/RealtimeSegmentZKMetadata.java         |   6 -
 .../helix/core/PinotHelixResourceManager.java      |  89 ++++-----
 .../helix/core/PinotTableIdealStateBuilder.java    |   9 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 201 +++++++++++++--------
 .../fakestream/FakePartitionGroupMetadata.java     |  48 -----
 .../kafka09/KafkaPartitionGroupMetadata.java       |  48 -----
 .../kafka20/KafkaPartitionGroupMetadata.java       |  48 -----
 .../pinot/spi/stream/PartitionGroupMetadata.java   |  52 +++++-
 8 files changed, 207 insertions(+), 294 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
index c46af53..d88be18 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
@@ -35,7 +35,6 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
   private Status _status = null;
   private int _sizeThresholdToFlushSegment = -1;
   private String _timeThresholdToFlushSegment = null; // store as period string for readability
-  private String _partitionGroupMetadataStr = null;
 
   public RealtimeSegmentZKMetadata() {
     setSegmentType(SegmentType.REALTIME);
@@ -50,7 +49,6 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
     if (flushThresholdTime != null && !flushThresholdTime.equals(NULL)) {
       _timeThresholdToFlushSegment = znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_TIME);
     }
-    _partitionGroupMetadataStr = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_GROUP_METADATA);
   }
 
   @Override
@@ -143,8 +141,4 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
   public void setTimeThresholdToFlushSegment(String timeThresholdPeriodString) {
     _timeThresholdToFlushSegment = timeThresholdPeriodString;
   }
-
-  public String getPartitionGroupMetadataStr() {
-    return _partitionGroupMetadataStr;
-  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7f36e5a..1ff3f9d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -85,7 +85,6 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -124,6 +123,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -1337,65 +1337,50 @@ public class PinotHelixResourceManager {
         IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
     IdealState idealState = getTableIdealState(realtimeTableName);
 
+
     if (streamConfig.isShardedConsumerType()) {
-      setupShardedRealtimeTable(streamConfig, idealState, realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber());
-    }
+      idealState = PinotTableIdealStateBuilder
+          .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
+              _enableBatchMessageMode);
+      _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+      LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
+      _pinotLLCRealtimeSegmentManager.setupNewShardedTable(rawRealtimeTableConfig, idealState);
+    } else {
 
-    if (streamConfig.hasHighLevelConsumerType()) {
-      if (idealState == null) {
-        LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
-        idealState = PinotTableIdealStateBuilder
-            .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
-                _propertyStore, _enableBatchMessageMode);
-        _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
-      } else {
-        // Remove LLC segments if it is not configured
-        if (!streamConfig.hasLowLevelConsumerType()) {
-          _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
+      if (streamConfig.hasHighLevelConsumerType()) {
+        if (idealState == null) {
+          LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
+          idealState = PinotTableIdealStateBuilder
+              .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
+                  _propertyStore, _enableBatchMessageMode);
+          _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
+        } else {
+          // Remove LLC segments if it is not configured
+          if (!streamConfig.hasLowLevelConsumerType()) {
+            _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
+          }
         }
+        // For HLC table, property store entry must exist to trigger watchers to create segments
+        ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
       }
-      // For HLC table, property store entry must exist to trigger watchers to create segments
-      ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
-    }
-
-    // Either we have only low-level consumer, or both.
-    if (streamConfig.hasLowLevelConsumerType()) {
-      // Will either create idealstate entry, or update the IS entry with new segments
-      // (unless there are low-level segments already present)
-      if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
-        PinotTableIdealStateBuilder
-            .buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, realtimeTableName, realtimeTableConfig,
-                idealState, _enableBatchMessageMode);
-        LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
-      } else {
-        LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
+
+      // Either we have only low-level consumer, or both.
+      if (streamConfig.hasLowLevelConsumerType()) {
+        // Will either create idealstate entry, or update the IS entry with new segments
+        // (unless there are low-level segments already present)
+        if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
+          idealState = PinotTableIdealStateBuilder
+              .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
+                  _enableBatchMessageMode);
+          _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+          LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
+        } else {
+          LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
+        }
       }
     }
   }
 
-  /**
-   * Sets up the realtime table ideal state
-   * @param streamConfig
-   */
-  private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) {
-    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
-    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
-        .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
-
-    // get current partition groups and their metadata - this will be empty when creating the table
-    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState);
-
-    // get new partition groups and their metadata,
-    // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard)
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 5000);
-
-    // setup segment zk metadata and ideal state for all the new found partition groups
-    _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
-  }
-
-
-
   private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) {
     String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource(realtimeTableName);
     if (!_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 1e95966..a7b3c9e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -94,9 +94,8 @@ public class PinotTableIdealStateBuilder {
     return idealState;
   }
 
-  public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
-      String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
-      boolean enableBatchMessageMode) {
+  public static IdealState buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
+      IdealState idealState, boolean enableBatchMessageMode) {
 
     // Validate replicasPerPartition here.
     final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -105,7 +104,7 @@ public class PinotTableIdealStateBuilder {
     }
     final int nReplicas;
     try {
-      nReplicas = Integer.valueOf(replicasPerPartitionStr);
+      nReplicas = Integer.parseInt(replicasPerPartitionStr);
     } catch (NumberFormatException e) {
       throw new PinotHelixResourceManager.InvalidTableConfigException(
           "Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e);
@@ -113,7 +112,7 @@ public class PinotTableIdealStateBuilder {
     if (idealState == null) {
       idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
     }
-    pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+    return idealState;
   }
 
   public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6b220f2..0de5e91 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
@@ -44,6 +45,7 @@ import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -161,82 +163,84 @@ public class PinotLLCRealtimeSegmentManager {
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
   }
 
+
   /**
-   * The committing segment will call this.
-   * 
-   * For example, say we have 3 shards, grouped into PartitionGroups as [0], [1], [2]
-   * Now segment of PG (partition group) 0 is committing. First, we'll update the metadata to DONE, and ideal state to ONLINE
-   * Then, the currentPartitionGroupMetadata list will contain - [1], [2]
-   * The newPartitionGroupMetadata list will contain - [0], [1], [2]
-   * We then get the set of PGs for which new segments need to be made - [0]
+   * Using the ideal state and segment metadata, return a list of the current partition groups
    */
-  public void commitPartitionGroup(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
-    TableConfig realtimeTableConfig = getTableConfig(realtimeTableName);
-    StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
-    int numReplicas = realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-    IdealState idealState = getIdealState(realtimeTableName);
-
-    // update status in segment metadata to DONE
-    // ..
-
-    // update Ideal State for this segment to ONLINE
-    // ..
-
-    // fetch current partition groups (which are actively CONSUMING - from example above, [1], [2])
-    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
-
-    // get new partition groups (honor any groupings which are already consuming - [0], [1], [2])
-    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
-    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
-        .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
-
-    // from the above list, remove the partition groups which are already CONSUMING
-    // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0])
-    // ..
-
-    // setup segment metadata and ideal state for the new found  partition groups
-    setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
-  }
+  public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
+    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
 
-  public void setupIdealStateForConsuming(List<SegmentZKMetadata> segmentZKMetadata, int numReplicas) {
-    // add all segments from the list to ideal state, with state CONSUMING
-  }
+    // from all segment names in the ideal state, find unique groups
+    Map<Integer, LLCSegmentName> groupIdToLatestSegment = new HashMap<>();
+    for (String segment : idealState.getPartitionSet()) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
+      int partitionGroupId = llcSegmentName.getPartitionGroupId();
+      groupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> {
+        if (latestSegment == null) {
+          return llcSegmentName;
+        } else {
+          return latestSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() ? latestSegment
+              : llcSegmentName;
+        }
+      });
+    }
 
-  public void persistSegmentMetadata(List<SegmentZKMetadata> segmentMetadata) {
-    // persist new segment metadata from list to zk
+    // create a PartitionGroupMetadata for each latest segment
+    for (Map.Entry<Integer, LLCSegmentName> entry : groupIdToLatestSegment.entrySet()) {
+      int partitionGroupId = entry.getKey();
+      LLCSegmentName llcSegmentName = entry.getValue();
+      RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider
+          .getRealtimeSegmentZKMetadata(_propertyStore, llcSegmentName.getTableName(), llcSegmentName.getSegmentName());
+      Preconditions.checkNotNull(realtimeSegmentZKMetadata);
+      LLCRealtimeSegmentZKMetadata llRealtimeSegmentZKMetadata =
+          (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
+      PartitionGroupMetadata partitionGroupMetadata =
+          new PartitionGroupMetadata(partitionGroupId, llcSegmentName.getSequenceNumber(),
+              llRealtimeSegmentZKMetadata.getStartOffset(), llRealtimeSegmentZKMetadata.getEndOffset(),
+              llRealtimeSegmentZKMetadata.getStatus().toString());
+      partitionGroupMetadataList.add(partitionGroupMetadata);
+    }
+    return partitionGroupMetadataList;
   }
 
   /**
-   * Using the list of partition group metadata, create a list of equivalent segment zk metadata
+   * Sets up the realtime table ideal state for a table of consumer type SHARDED
    */
-  public List<SegmentZKMetadata> constructSegmentMetadata(List<PartitionGroupMetadata> partitionGroupMetadataList) {
-    List<SegmentZKMetadata> segmentZKMetadata = new ArrayList<>();
-    // for each partition group construct a segment zk metadata object
-    return segmentZKMetadata;
-  }
+  public void setupNewShardedTable(TableConfig tableConfig, IdealState idealState) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
-  /**
-   * Using the ideal state, return a list of the current partition groups
-   */
-  public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
-    // from all segment names in the ideal state, find unique groups
+    String realtimeTableName = tableConfig.getTableName();
+    LOGGER.info("Setting up new SHARDED table: {}", realtimeTableName);
 
-    // create a PartitionGroupMetadata, one for each group
-    return partitionGroupMetadataList;
-  }
+    _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
 
-  public void setupNewPartitionGroups(List<PartitionGroupMetadata> newPartitionGroupMetadataList, int numReplicas) {
-    // construct segment zk metadata for the new partition groups
-    List<SegmentZKMetadata> segmentMetadata = constructSegmentMetadata(newPartitionGroupMetadataList);
+    PartitionLevelStreamConfig streamConfig =
+        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+
+    // get new partition groups and their metadata
+    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+        .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
+    int numPartitionGroups = newPartitionGroupMetadataList.size();
+
+    InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
+    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
-    // create these new segments metadata
-    persistSegmentMetadata(segmentMetadata);
+    SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+        Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
 
-    // setup ideal state for the new segments
-    setupIdealStateForConsuming(segmentMetadata, numReplicas);
+    long currentTimeMs = getCurrentTimeMs();
+    Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+    for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
+      String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata.getPartitionGroupId(),
+          currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
+      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
+          instancePartitionsMap);
+    }
+    setIdealState(realtimeTableName, idealState);
   }
 
   public boolean getIsSplitCommitEnabled() {
@@ -532,13 +536,50 @@ public class PinotLLCRealtimeSegmentManager {
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
     // Step-2
+
+    // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up
+    // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+    StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+        .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
+    // find new partition groups [A],[B],[C],[D]
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
+
+    // create new segment metadata, only if it is not IN_PROGRESS in the current state
+    Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
+        Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
+
+    List<String> newConsumingSegmentNames = new ArrayList<>();
+    String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
-    LLCSegmentName newLLCSegmentName =
-        getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), newSegmentCreationTimeMs);
-    createNewSegmentZKMetadata(tableConfig,
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)),
-        newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata,
-        instancePartitions, numPartitions, numReplicas);
+    for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
+      int newPartitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+      PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
+      if (currentPartitionGroupMetadata == null) { // not present in current state
+        // make new segment
+        LLCSegmentName newLLCSegmentName =
+            new LLCSegmentName(rawTableName, newPartitionGroupId, STARTING_SEQUENCE_NUMBER, newSegmentCreationTimeMs);
+        createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
+                IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
+            committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+        newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+      } else {
+        String currentStatus = currentPartitionGroupMetadata.getStatus();
+        if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { // not IN_PROGRESS anymore in current state
+          // make new segment
+          LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
+              currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+          createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
+                  IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
+              committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+          newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+        }
+      }
+    }
+
 
     // Step-3
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -554,7 +595,7 @@ public class PinotLLCRealtimeSegmentManager {
     Lock lock = _idealStateUpdateLocks[lockIndex];
     try {
       lock.lock();
-      updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newLLCSegmentName.getSegmentName(),
+      updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentNames,
           segmentAssignment, instancePartitionsMap);
     } finally {
       lock.unlock();
@@ -845,7 +886,7 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
-      String newSegmentName, SegmentAssignment segmentAssignment,
+      List<String> newSegmentNames, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
     HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
       assert idealState != null;
@@ -862,14 +903,18 @@ public class PinotLLCRealtimeSegmentManager {
             "Exceeded max segment completion time for segment " + committingSegmentName);
       }
       updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
-          newSegmentName, segmentAssignment, instancePartitionsMap);
+          null, segmentAssignment, instancePartitionsMap);
+      for (String newSegmentName : newSegmentNames) {
+        updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), null,
+            newSegmentName, segmentAssignment, instancePartitionsMap);
+      }
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
   }
 
   @VisibleForTesting
   void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
-      @Nullable String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment,
+      @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
@@ -880,11 +925,11 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Assign instances to the new segment and add instances as state CONSUMING
-    List<String> instancesAssigned =
-        segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
-    instanceStatesMap.put(newSegmentName,
-        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
-    LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
+    if (newSegmentName != null) {
+      List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
+      instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+      LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
+    }
   }
 
   /*
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
deleted file mode 100644
index 78ee12c..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.pinot.core.realtime.impl.fakestream;
-
-import org.apache.pinot.spi.stream.Checkpoint;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-
-
-public class FakePartitionGroupMetadata implements PartitionGroupMetadata {
-
-  private final int _groupId;
-  public FakePartitionGroupMetadata(int groupId) {
-    _groupId = groupId;
-  }
-
-  @Override
-  public int getGroupId() {
-    return getGroupId();
-  }
-
-  @Override
-  public Checkpoint getStartCheckpoint() {
-    return null;
-  }
-
-  @Override
-  public Checkpoint getEndCheckpoint() {
-    return null;
-  }
-
-  @Override
-  public void setStartCheckpoint(Checkpoint startCheckpoint) {
-
-  }
-
-  @Override
-  public void setEndCheckpoint(Checkpoint endCheckpoint) {
-
-  }
-
-  @Override
-  public byte[] serialize() {
-    return new byte[0];
-  }
-
-  @Override
-  public PartitionGroupMetadata deserialize(byte[] blob) {
-    return null;
-  }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
deleted file mode 100644
index 1d792ac..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.pinot.plugin.stream.kafka09;
-
-import org.apache.pinot.spi.stream.Checkpoint;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-
-
-public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
-
-  private final int _groupId;
-  public KafkaPartitionGroupMetadata(int partitionId) {
-    _groupId = partitionId;
-  }
-
-  @Override
-  public int getGroupId() {
-    return _groupId;
-  }
-
-  @Override
-  public Checkpoint getStartCheckpoint() {
-    return null;
-  }
-
-  @Override
-  public Checkpoint getEndCheckpoint() {
-    return null;
-  }
-
-  @Override
-  public void setStartCheckpoint(Checkpoint startCheckpoint) {
-
-  }
-
-  @Override
-  public void setEndCheckpoint(Checkpoint endCheckpoint) {
-
-  }
-
-  @Override
-  public byte[] serialize() {
-    return new byte[0];
-  }
-
-  @Override
-  public PartitionGroupMetadata deserialize(byte[] blob) {
-    return null;
-  }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
deleted file mode 100644
index 31ae75a..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.pinot.plugin.stream.kafka20;
-
-import org.apache.pinot.spi.stream.Checkpoint;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-
-
-public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
-
-  private final int _groupId;
-  public KafkaPartitionGroupMetadata(int partitionId) {
-    _groupId = partitionId;
-  }
-
-  @Override
-  public int getGroupId() {
-    return _groupId;
-  }
-
-  @Override
-  public Checkpoint getStartCheckpoint() {
-    return null;
-  }
-
-  @Override
-  public Checkpoint getEndCheckpoint() {
-    return null;
-  }
-
-  @Override
-  public void setStartCheckpoint(Checkpoint startCheckpoint) {
-
-  }
-
-  @Override
-  public void setEndCheckpoint(Checkpoint endCheckpoint) {
-
-  }
-
-  @Override
-  public byte[] serialize() {
-    return new byte[0];
-  }
-
-  @Override
-  public PartitionGroupMetadata deserialize(byte[] blob) {
-    return null;
-  }
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index 0f44173..f662d99 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -18,22 +18,56 @@
  */
 package org.apache.pinot.spi.stream;
 
-import java.util.List;
+public class PartitionGroupMetadata {
 
+  // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
+  private final int _partitionGroupId;
+  private int _sequenceNumber;
+  private String _startCheckpoint;
+  private String _endCheckpoint;
+  private String _status;
 
-public interface PartitionGroupMetadata {
+  public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, String startCheckpoint,
+      String endCheckpoint, String status) {
+    _partitionGroupId = partitionGroupId;
+    _sequenceNumber = sequenceNumber;
+    _startCheckpoint = startCheckpoint;
+    _endCheckpoint = endCheckpoint;
+  }
 
-  int getGroupId();
+  public void setSequenceNumber(int sequenceNumber) {
+    _sequenceNumber = sequenceNumber;
+  }
 
-  Checkpoint getStartCheckpoint(); // similar to getStartOffset
+  public void setStartCheckpoint(String startCheckpoint) {
+    _startCheckpoint = startCheckpoint;
+  }
 
-  Checkpoint getEndCheckpoint(); // similar to getEndOffset
+  public void setEndCheckpoint(String endCheckpoint) {
+    _endCheckpoint = endCheckpoint;
+  }
 
-  void setStartCheckpoint(Checkpoint startCheckpoint);
+  public int getPartitionGroupId() {
+    return _partitionGroupId;
+  }
 
-  void setEndCheckpoint(Checkpoint endCheckpoint);
+  public int getSequenceNumber() {
+    return _sequenceNumber;
+  }
 
-  byte[] serialize();
+  public String getStartCheckpoint() {
+    return _startCheckpoint;
+  }
 
-  PartitionGroupMetadata deserialize(byte[] blob);
+  public String getEndCheckpoint() {
+    return _endCheckpoint;
+  }
+
+  public String getStatus() {
+    return _status;
+  }
+
+  public void setStatus(String status) {
+    _status = status;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org