You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:42:58 UTC
[incubator-pinot] 08/47: An attempt at server-side changes
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 31c64a0cc138146dc59c1ce665f3ca72fd7b52f9
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 17:19:24 2020 -0800
An attempt at server-side changes
---
.../realtime/LLRealtimeSegmentDataManager.java | 22 +++++++++++++---------
.../org/apache/pinot/spi/stream/FetchResult.java | 5 +----
.../pinot/spi/stream/PartitionGroupConsumer.java | 2 +-
3 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0938251..054676e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -71,8 +71,10 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.FetchResult;
import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.RowMetadata;
@@ -249,10 +251,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private Thread _consumerThread;
private final String _streamTopic;
private final int _partitionGroupId;
+ private final PartitionGroupMetadata _partitionGroupMetadata;
final String _clientId;
private final LLCSegmentName _llcSegmentName;
private final RecordTransformer _recordTransformer;
- private PartitionLevelConsumer _partitionLevelConsumer = null;
+ private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _streamMetadataProvider = null;
private final File _resourceTmpDir;
private final String _tableNameWithType;
@@ -381,12 +384,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Update _currentOffset upon return from this method
MessageBatch messageBatch;
try {
- messageBatch = _partitionLevelConsumer
+ FetchResult fetchResult = _partitionGroupConsumer
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+ messageBatch = fetchResult.getMessages();
consecutiveErrorCount = 0;
- } catch (TimeoutException e) {
- handleTransientStreamErrors(e);
- continue;
} catch (TransientConsumerException e) {
handleTransientStreamErrors(e);
continue;
@@ -899,7 +900,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private void closePartitionLevelConsumer() {
try {
- _partitionLevelConsumer.close();
+ _partitionGroupConsumer.close();
} catch (Exception e) {
segmentLogger.warn("Could not close stream consumer", e);
}
@@ -1131,6 +1132,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_llcSegmentName = llcSegmentName;
_partitionGroupId = _llcSegmentName.getPartitionGroupId();
+ _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, _llcSegmentName.getSequenceNumber(),
+ _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(),
+ _segmentZKMetadata.getStatus().toString());
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
_metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
@@ -1311,11 +1315,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
* @param reason
*/
private void makeStreamConsumer(String reason) {
- if (_partitionLevelConsumer != null) {
+ if (_partitionGroupConsumer != null) {
closePartitionLevelConsumer();
}
segmentLogger.info("Creating new stream consumer, reason: {}", reason);
- _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _partitionGroupId);
+ _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata);
}
/**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
index b0ed6e5..7e8a911 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
@@ -18,10 +18,7 @@
*/
package org.apache.pinot.spi.stream;
-import java.util.List;
-
-
public interface FetchResult<T> {
Checkpoint getLastCheckpoint();
- List<T> getMessages();
+ MessageBatch<T> getMessages();
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index e096e67..bbbdaad 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -22,5 +22,5 @@ import java.io.Closeable;
public interface PartitionGroupConsumer extends Closeable {
- FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+ FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org