You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/05 18:21:12 UTC
[incubator-pinot] branch sharded_consumer_type_support_with_kinesis
updated: Remove unused classes and changes
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new 05e126b Remove unused classes and changes
05e126b is described below
commit 05e126b1cf0664cc020f424a3a4b1b19f4023205
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue Jan 5 10:20:43 2021 -0800
Remove unused classes and changes
---
.../apache/pinot/common/utils/CommonConstants.java | 4 -
.../helix/core/PinotHelixResourceManager.java | 61 ++++++--------
.../helix/core/PinotTableIdealStateBuilder.java | 10 ++-
.../realtime/PinotLLCRealtimeSegmentManager.java | 96 ++++++++++++----------
.../impl/fakestream/FakeStreamConsumerFactory.java | 5 +-
.../fakestream/FakeStreamMetadataProvider.java | 8 +-
...lakyConsumerRealtimeClusterIntegrationTest.java | 2 -
.../kafka09/KafkaPartitionLevelConsumerTest.java | 4 +-
.../kafka20/KafkaPartitionLevelConsumer.java | 1 -
.../kafka20/KafkaStreamMetadataProvider.java | 1 -
.../plugin/stream/kinesis/KinesisConsumer.java | 2 +-
.../org/apache/pinot/spi/stream/FetchResult.java | 24 ------
.../org/apache/pinot/spi/stream/MessageBatch.java | 2 -
.../spi/stream/PartitionGroupMetadataList.java | 30 -------
.../org/apache/pinot/spi/stream/StreamConfig.java | 6 +-
.../pinot/spi/stream/StreamMetadataProvider.java | 4 +-
16 files changed, 91 insertions(+), 169 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 7a91d8c..9773e7e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -379,10 +379,6 @@ public class CommonConstants {
public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
public static final String PARTITION_METADATA = "segment.partition.metadata";
/**
- * Serialized {@link org.apache.pinot.spi.stream.PartitionGroupMetadata} for this segment
- */
- public static final String PARTITION_GROUP_METADATA = "segment.partition.group.metadata";
- /**
* This field is used for parallel push protection to lock the segment globally.
* We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
* next upload won't be blocked forever.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5388eeb..af99860 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1332,45 +1332,34 @@ public class PinotHelixResourceManager {
IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
IdealState idealState = getTableIdealState(realtimeTableName);
-
- if (streamConfig.isShardedConsumerType()) {
- idealState = PinotTableIdealStateBuilder
- .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
- _enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
- LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName);
- } else {
-
- if (streamConfig.hasHighLevelConsumerType()) {
- if (idealState == null) {
- LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
- idealState = PinotTableIdealStateBuilder
- .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
- _propertyStore, _enableBatchMessageMode);
- _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
- } else {
- // Remove LLC segments if it is not configured
- if (!streamConfig.hasLowLevelConsumerType()) {
- _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
- }
+ if (streamConfig.hasHighLevelConsumerType()) {
+ if (idealState == null) {
+ LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
+ idealState = PinotTableIdealStateBuilder
+ .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
+ _propertyStore, _enableBatchMessageMode);
+ _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
+ } else {
+ // Remove LLC segments if it is not configured
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
}
- // For HLC table, property store entry must exist to trigger watchers to create segments
- ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
}
+ // For HLC table, property store entry must exist to trigger watchers to create segments
+ ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
+ }
- // Either we have only low-level consumer, or both.
- if (streamConfig.hasLowLevelConsumerType()) {
- // Will either create idealstate entry, or update the IS entry with new segments
- // (unless there are low-level segments already present)
- if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
- idealState = PinotTableIdealStateBuilder
- .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
- _enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
- LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
- } else {
- LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
- }
+ // Either we have only low-level consumer, or both.
+ if (streamConfig.hasLowLevelConsumerType()) {
+ // Will either create idealstate entry, or update the IS entry with new segments
+ // (unless there are low-level segments already present)
+ if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
+ PinotTableIdealStateBuilder
+ .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
+ _enableBatchMessageMode);
+ LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
+ } else {
+ LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
}
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 8b200bb..68bcf57 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -94,8 +95,9 @@ public class PinotTableIdealStateBuilder {
return idealState;
}
- public static IdealState buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
- IdealState idealState, boolean enableBatchMessageMode) {
+ public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+ String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
+ boolean enableBatchMessageMode) {
// Validate replicasPerPartition here.
final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -104,7 +106,7 @@ public class PinotTableIdealStateBuilder {
}
final int nReplicas;
try {
- nReplicas = Integer.parseInt(replicasPerPartitionStr);
+ nReplicas = Integer.valueOf(replicasPerPartitionStr);
} catch (NumberFormatException e) {
throw new PinotHelixResourceManager.InvalidTableConfigException(
"Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e);
@@ -112,7 +114,7 @@ public class PinotTableIdealStateBuilder {
if (idealState == null) {
idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
}
- return idealState;
+ pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
}
public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index cf3a401..20f79d4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -24,12 +24,12 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -84,9 +84,7 @@ import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -204,42 +202,6 @@ public class PinotLLCRealtimeSegmentManager {
return partitionGroupMetadataList;
}
- /**
- * Sets up the realtime table ideal state for a table of consumer type SHARDED
- */
- public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
- Preconditions.checkState(!_isStopping, "Segment manager is stopping");
-
- String realtimeTableName = tableConfig.getTableName();
- LOGGER.info("Setting up new SHARDED table: {}", realtimeTableName);
-
- _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
-
- PartitionLevelStreamConfig streamConfig =
- new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-
- // get new partition groups and their metadata
- List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
- int numPartitionGroups = newPartitionGroupInfoList.size();
-
- InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
- int numReplicas = getNumReplicas(tableConfig, instancePartitions);
-
- SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
- Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
-
- long currentTimeMs = getCurrentTimeMs();
- Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
- for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
- String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
- currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
- instancePartitionsMap);
- }
- setIdealState(realtimeTableName, idealState);
- }
-
public boolean getIsSplitCommitEnabled() {
return _controllerConf.getAcceptSplitCommit();
}
@@ -274,6 +236,50 @@ public class PinotLLCRealtimeSegmentManager {
}
/**
+ * Sets up the initial segments for a new LLC real-time table.
+ * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC and LLC are configured.
+ */
+ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ String realtimeTableName = tableConfig.getTableName();
+ LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
+
+ // Make sure all the existing segments are HLC segments
+ List<String> currentSegments = getAllSegments(realtimeTableName);
+ for (String segmentName : currentSegments) {
+ // TODO: Should return 4xx HTTP status code. Currently all exceptions are returning 500
+ Preconditions.checkState(SegmentName.isHighLevelConsumerSegmentName(segmentName),
+ "Cannot set up new LLC table: %s with existing non-HLC segment: %s", realtimeTableName, segmentName);
+ }
+
+ _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
+
+ PartitionLevelStreamConfig streamConfig =
+ new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
+ // get new partition groups and their metadata
+ List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
+ int numPartitionGroups = newPartitionGroupInfoList.size();
+
+ int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+
+ SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
+
+ long currentTimeMs = getCurrentTimeMs();
+ Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+ for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
+ String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
+ currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
+ instancePartitionsMap);
+ }
+ setIdealState(realtimeTableName, idealState);
+ }
+
+ /**
* Removes all LLC segments from the given IdealState.
*/
public void removeLLCSegments(IdealState idealState) {
@@ -498,7 +504,7 @@ public class PinotLLCRealtimeSegmentManager {
IngestionConfigUtils.getStreamConfigMap(tableConfig));
// find new partition groups [A],[B],[C],[D]
- List<PartitionGroupInfo> newPartitionGroupMetadataList =
+ List<PartitionGroupInfo> newPartitionGroupInfoList =
getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
// create new segment metadata, only if it is not IN_PROGRESS in the current state
@@ -508,7 +514,7 @@ public class PinotLLCRealtimeSegmentManager {
List<String> newConsumingSegmentNames = new ArrayList<>();
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
- for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+ for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
@@ -1143,14 +1149,16 @@ public class PinotLLCRealtimeSegmentManager {
return System.currentTimeMillis();
}
+ // fixme: investigate if this should only return active partitions (i.e. skip a shard if it has reached eol)
+ // or return all unique partitions found in ideal state right from the birth of the table
private int getNumPartitionsFromIdealState(IdealState idealState) {
- int numPartitions = 0;
+ Set<String> uniquePartitions = new HashSet<>();
for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1);
+ uniquePartitions.add(String.valueOf(new LLCSegmentName(segmentName).getPartitionGroupId()));
}
}
- return numPartitions;
+ return uniquePartitions.size();
}
private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index b0dc7eb..bb01e5c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.realtime.impl.fakestream;
-import java.util.Collections;
import java.util.Set;
import org.apache.pinot.core.util.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -27,8 +26,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -82,7 +79,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
// stream metadata provider
StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
- int partitionCount = streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10_000).size();
+ int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000);
System.out.println(partitionCount);
// Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index 61aa01f..e0b8ebd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,13 +19,9 @@
package org.apache.pinot.core.realtime.impl.fakestream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupInfo;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -35,12 +31,10 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
* StreamMetadataProvider implementation for the fake stream
*/
public class FakeStreamMetadataProvider implements StreamMetadataProvider {
- private final int _numPartitions;
- private StreamConfig _streamConfig;
+ private int _numPartitions;
public FakeStreamMetadataProvider(StreamConfig streamConfig) {
_numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
- _streamConfig = streamConfig;
}
@Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index 4503de0..b05244f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -22,8 +22,6 @@ import java.lang.reflect.Constructor;
import java.util.Random;
import java.util.Set;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index 90dc5ad..beb82e5 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -267,7 +267,7 @@ public class KafkaPartitionLevelConsumerTest {
}
@Test
- public void testGetPartitionCount() throws Exception {
+ public void testGetPartitionCount() {
String streamType = "kafka";
String streamKafkaTopicName = "theTopic";
String streamKafkaBrokerList = "abcd:1234,bcde:2345";
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
KafkaStreamMetadataProvider streamMetadataProvider =
new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
- Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10000), 2);
+ Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
}
@Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 25b1742..f9b4365 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 38c49f5..c0e2041 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -42,7 +42,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
}
@Override
- @Deprecated
public int fetchPartitionCount(long timeoutMillis) {
return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index a97f3dc..70d2c8a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -170,7 +170,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
GetShardIteratorRequest.Builder requestBuilder =
GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType);
- if (sequenceNumber != null) {
+ if (sequenceNumber != null && _shardIteratorType.toString().contains("SEQUENCE")) {
requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
}
return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
deleted file mode 100644
index 7e8a911..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-public interface FetchResult<T> {
- Checkpoint getLastCheckpoint();
- MessageBatch<T> getMessages();
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 5af72c0..3052b9e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.spi.stream;
-import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -62,7 +61,6 @@ public interface MessageBatch<T> {
* Returns the metadata associated with the message at a particular index. This typically includes the timestamp
* when the message was ingested by the upstream stream-provider and other relevant metadata.
*/
- @Nullable
default RowMetadata getMetadataAtIndex(int index) {
return null;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
deleted file mode 100644
index 1568d63..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataList.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-import java.util.List;
-
-
-public interface PartitionGroupMetadataList {
-
- List<PartitionGroupMetadata> getMetadataList();
-
- PartitionGroupMetadata getPartitionGroupMetadata(int index);
-
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index a3e359e..d343203 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -41,7 +41,7 @@ public class StreamConfig {
* The type of the stream consumer either HIGHLEVEL or LOWLEVEL. For backward compatibility, adding SIMPLE which is equivalent to LOWLEVEL
*/
public enum ConsumerType {
- HIGHLEVEL, LOWLEVEL, SHARDED
+ HIGHLEVEL, LOWLEVEL
}
public static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
@@ -273,10 +273,6 @@ public class StreamConfig {
return _consumerTypes.contains(ConsumerType.LOWLEVEL);
}
- public boolean isShardedConsumerType() {
- return _consumerTypes.size() == 1 && _consumerTypes.get(0).equals(ConsumerType.SHARDED);
- }
-
public String getConsumerFactoryClassName() {
return _consumerFactoryClassName;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 572cd02..c64f710 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -36,9 +36,9 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
public interface StreamMetadataProvider extends Closeable {
/**
* Fetches the number of partitions for a topic given the stream configs
- * @deprecated use getPartitionGroupMetadataList instead
+ * @param timeoutMillis
+ * @return
*/
- @Deprecated
int fetchPartitionCount(long timeoutMillis);
// Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org