You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/05 18:21:12 UTC

[incubator-pinot] branch sharded_consumer_type_support_with_kinesis updated: Remove unused classes and changes

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new 05e126b  Remove unused classes and changes
05e126b is described below

commit 05e126b1cf0664cc020f424a3a4b1b19f4023205
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue Jan 5 10:20:43 2021 -0800

    Remove unused classes and changes
---
 .../apache/pinot/common/utils/CommonConstants.java |  4 -
 .../helix/core/PinotHelixResourceManager.java      | 61 ++++++--------
 .../helix/core/PinotTableIdealStateBuilder.java    | 10 ++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 96 ++++++++++++----------
 .../impl/fakestream/FakeStreamConsumerFactory.java |  5 +-
 .../fakestream/FakeStreamMetadataProvider.java     |  8 +-
 ...lakyConsumerRealtimeClusterIntegrationTest.java |  2 -
 .../kafka09/KafkaPartitionLevelConsumerTest.java   |  4 +-
 .../kafka20/KafkaPartitionLevelConsumer.java       |  1 -
 .../kafka20/KafkaStreamMetadataProvider.java       |  1 -
 .../plugin/stream/kinesis/KinesisConsumer.java     |  2 +-
 .../org/apache/pinot/spi/stream/FetchResult.java   | 24 ------
 .../org/apache/pinot/spi/stream/MessageBatch.java  |  2 -
 .../spi/stream/PartitionGroupMetadataList.java     | 30 -------
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  6 +-
 .../pinot/spi/stream/StreamMetadataProvider.java   |  4 +-
 16 files changed, 91 insertions(+), 169 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7a91d8c..9773e7e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -379,10 +379,6 @@ public class CommonConstants {
     public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
     public static final String PARTITION_METADATA = "segment.partition.metadata";
     /**
-     * Serialized {@link org.apache.pinot.spi.stream.PartitionGroupMetadata} for this segment
-     */
-    public static final String PARTITION_GROUP_METADATA = "segment.partition.group.metadata";
-    /**
      * This field is used for parallel push protection to lock the segment globally.
      * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
      * next upload won't be blocked forever.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5388eeb..af99860 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1332,45 +1332,34 @@ public class PinotHelixResourceManager {
         IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
     IdealState idealState = getTableIdealState(realtimeTableName);
 
-
-    if (streamConfig.isShardedConsumerType()) {
-      idealState = PinotTableIdealStateBuilder
-          .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
-              _enableBatchMessageMode);
-      _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
-      LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName);
-    } else {
-
-      if (streamConfig.hasHighLevelConsumerType()) {
-        if (idealState == null) {
-          LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
-          idealState = PinotTableIdealStateBuilder
-              .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
-                  _propertyStore, _enableBatchMessageMode);
-          _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
-        } else {
-          // Remove LLC segments if it is not configured
-          if (!streamConfig.hasLowLevelConsumerType()) {
-            _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
-          }
+    if (streamConfig.hasHighLevelConsumerType()) {
+      if (idealState == null) {
+        LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
+        idealState = PinotTableIdealStateBuilder
+            .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
+                _propertyStore, _enableBatchMessageMode);
+        _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
+      } else {
+        // Remove LLC segments if it is not configured
+        if (!streamConfig.hasLowLevelConsumerType()) {
+          _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
         }
-        // For HLC table, property store entry must exist to trigger watchers to create segments
-        ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
       }
+      // For HLC table, property store entry must exist to trigger watchers to create segments
+      ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
+    }
 
-      // Either we have only low-level consumer, or both.
-      if (streamConfig.hasLowLevelConsumerType()) {
-        // Will either create idealstate entry, or update the IS entry with new segments
-        // (unless there are low-level segments already present)
-        if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
-          idealState = PinotTableIdealStateBuilder
-              .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
-                  _enableBatchMessageMode);
-          _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
-          LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
-        } else {
-          LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
-        }
+    // Either we have only low-level consumer, or both.
+    if (streamConfig.hasLowLevelConsumerType()) {
+      // Will either create idealstate entry, or update the IS entry with new segments
+      // (unless there are low-level segments already present)
+      if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
+        PinotTableIdealStateBuilder
+            .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
+                _enableBatchMessageMode);
+        LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
+      } else {
+        LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
       }
     }
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 8b200bb..68bcf57 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -94,8 +95,9 @@ public class PinotTableIdealStateBuilder {
     return idealState;
   }
 
-  public static IdealState buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
-      IdealState idealState, boolean enableBatchMessageMode) {
+  public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+      String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
+      boolean enableBatchMessageMode) {
 
     // Validate replicasPerPartition here.
     final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -104,7 +106,7 @@ public class PinotTableIdealStateBuilder {
     }
     final int nReplicas;
     try {
-      nReplicas = Integer.parseInt(replicasPerPartitionStr);
+      nReplicas = Integer.valueOf(replicasPerPartitionStr);
     } catch (NumberFormatException e) {
       throw new PinotHelixResourceManager.InvalidTableConfigException(
           "Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e);
@@ -112,7 +114,7 @@ public class PinotTableIdealStateBuilder {
     if (idealState == null) {
       idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
     }
-    return idealState;
+    pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
   }
 
   public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index cf3a401..20f79d4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -24,12 +24,12 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -84,9 +84,7 @@ import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -204,42 +202,6 @@ public class PinotLLCRealtimeSegmentManager {
     return partitionGroupMetadataList;
   }
 
-  /**
-   * Sets up the realtime table ideal state for a table of consumer type SHARDED
-   */
-  public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
-    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
-
-    String realtimeTableName = tableConfig.getTableName();
-    LOGGER.info("Setting up new SHARDED table: {}", realtimeTableName);
-
-    _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
-
-    PartitionLevelStreamConfig streamConfig =
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-
-    // get new partition groups and their metadata
-    List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
-    int numPartitionGroups = newPartitionGroupInfoList.size();
-
-    InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
-    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
-
-    SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
-        Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
-
-    long currentTimeMs = getCurrentTimeMs();
-    Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
-    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
-      String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
-          currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
-      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
-          instancePartitionsMap);
-    }
-    setIdealState(realtimeTableName, idealState);
-  }
-
   public boolean getIsSplitCommitEnabled() {
     return _controllerConf.getAcceptSplitCommit();
   }
@@ -274,6 +236,50 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   /**
+   * Sets up the initial segments for a new LLC real-time table.
+   * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC and LLC are configured.
+   */
+  public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    String realtimeTableName = tableConfig.getTableName();
+    LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
+
+    // Make sure all the existing segments are HLC segments
+    List<String> currentSegments = getAllSegments(realtimeTableName);
+    for (String segmentName : currentSegments) {
+      // TODO: Should return 4xx HTTP status code. Currently all exceptions are returning 500
+      Preconditions.checkState(SegmentName.isHighLevelConsumerSegmentName(segmentName),
+          "Cannot set up new LLC table: %s with existing non-HLC segment: %s", realtimeTableName, segmentName);
+    }
+
+    _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
+
+    PartitionLevelStreamConfig streamConfig =
+        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
+    // get new partition groups and their metadata
+    List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
+    int numPartitionGroups = newPartitionGroupInfoList.size();
+
+    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+
+    SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
+    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+        Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
+
+    long currentTimeMs = getCurrentTimeMs();
+    Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
+      String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
+          currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
+      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
+          instancePartitionsMap);
+    }
+    setIdealState(realtimeTableName, idealState);
+  }
+
+  /**
    * Removes all LLC segments from the given IdealState.
    */
   public void removeLLCSegments(IdealState idealState) {
@@ -498,7 +504,7 @@ public class PinotLLCRealtimeSegmentManager {
         IngestionConfigUtils.getStreamConfigMap(tableConfig));
 
     // find new partition groups [A],[B],[C],[D]
-    List<PartitionGroupInfo> newPartitionGroupMetadataList =
+    List<PartitionGroupInfo> newPartitionGroupInfoList =
         getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
 
     // create new segment metadata, only if it is not IN_PROGRESS in the current state
@@ -508,7 +514,7 @@ public class PinotLLCRealtimeSegmentManager {
     List<String> newConsumingSegmentNames = new ArrayList<>();
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
-    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
       int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
       PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
       if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
@@ -1143,14 +1149,16 @@ public class PinotLLCRealtimeSegmentManager {
     return System.currentTimeMillis();
   }
 
+  // fixme: investigate if this should only return active partitions (i.e. skip a shard if it has reached eol)
+  //  or return all unique partitions found in ideal state right from the birth of the table
   private int getNumPartitionsFromIdealState(IdealState idealState) {
-    int numPartitions = 0;
+    Set<String> uniquePartitions = new HashSet<>();
     for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
       if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-        numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1);
+        uniquePartitions.add(String.valueOf(new LLCSegmentName(segmentName).getPartitionGroupId()));
       }
     }
-    return numPartitions;
+    return uniquePartitions.size();
   }
 
   private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index b0dc7eb..bb01e5c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.realtime.impl.fakestream;
 
-import java.util.Collections;
 import java.util.Set;
 import org.apache.pinot.core.util.IngestionUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -27,8 +26,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -82,7 +79,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
 
     // stream metadata provider
     StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
-    int partitionCount = streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10_000).size();
+    int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000);
     System.out.println(partitionCount);
 
     // Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index 61aa01f..e0b8ebd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,13 +19,9 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupInfo;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -35,12 +31,10 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
  * StreamMetadataProvider implementation for the fake stream
  */
 public class FakeStreamMetadataProvider implements StreamMetadataProvider {
-  private final int _numPartitions;
-  private StreamConfig _streamConfig;
+  private int _numPartitions;
 
   public FakeStreamMetadataProvider(StreamConfig streamConfig) {
     _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
-    _streamConfig = streamConfig;
   }
 
   @Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index 4503de0..b05244f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -22,8 +22,6 @@ import java.lang.reflect.Constructor;
 import java.util.Random;
 import java.util.Set;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index 90dc5ad..beb82e5 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -267,7 +267,7 @@ public class KafkaPartitionLevelConsumerTest {
   }
 
   @Test
-  public void testGetPartitionCount() throws Exception {
+  public void testGetPartitionCount() {
     String streamType = "kafka";
     String streamKafkaTopicName = "theTopic";
     String streamKafkaBrokerList = "abcd:1234,bcde:2345";
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
 
     KafkaStreamMetadataProvider streamMetadataProvider =
         new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
-    Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10000), 2);
+    Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
   }
 
   @Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 25b1742..f9b4365 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 38c49f5..c0e2041 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -42,7 +42,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
   }
 
   @Override
-  @Deprecated
   public int fetchPartitionCount(long timeoutMillis) {
     return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index a97f3dc..70d2c8a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -170,7 +170,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
     GetShardIteratorRequest.Builder requestBuilder =
         GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType);
 
-    if (sequenceNumber != null) {
+    if (sequenceNumber != null && _shardIteratorType.toString().contains("SEQUENCE")) {
       requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
     }
     return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
deleted file mode 100644
index 7e8a911..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-public interface FetchResult<T> {
-  Checkpoint getLastCheckpoint();
-  MessageBatch<T> getMessages();
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 5af72c0..3052b9e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.spi.stream;
 
-import javax.annotation.Nullable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 
@@ -62,7 +61,6 @@ public interface MessageBatch<T> {
    * Returns the metadata associated with the message at a particular index. This typically includes the timestamp
    * when the message was ingested by the upstream stream-provider and other relevant metadata.
    */
-  @Nullable
   default RowMetadata getMetadataAtIndex(int index) {
     return null;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
deleted file mode 100644
index 1568d63..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-import java.util.List;
-
-
-public interface PartitionGroupMetadataList {
-
-  List<PartitionGroupMetadata> getMetadataList();
-
-  PartitionGroupMetadata getPartitionGroupMetadata(int index);
-
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index a3e359e..d343203 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -41,7 +41,7 @@ public class StreamConfig {
    * The type of the stream consumer either HIGHLEVEL or LOWLEVEL. For backward compatibility, adding SIMPLE which is equivalent to LOWLEVEL
    */
   public enum ConsumerType {
-    HIGHLEVEL, LOWLEVEL, SHARDED
+    HIGHLEVEL, LOWLEVEL
   }
 
   public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
@@ -273,10 +273,6 @@ public class StreamConfig {
     return _consumerTypes.contains(ConsumerType.LOWLEVEL);
   }
 
-  public boolean isShardedConsumerType() {
-    return _consumerTypes.size() == 1 && _consumerTypes.get(0).equals(ConsumerType.SHARDED);
-  }
-
   public String getConsumerFactoryClassName() {
     return _consumerFactoryClassName;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 572cd02..c64f710 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -36,9 +36,9 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
 public interface StreamMetadataProvider extends Closeable {
   /**
    * Fetches the number of partitions for a topic given the stream configs
-   * @deprecated use getPartitionGroupMetadataList instead
+   * @param timeoutMillis
+   * @return
    */
-  @Deprecated
   int fetchPartitionCount(long timeoutMillis);
 
   // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org