You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:42:58 UTC

[incubator-pinot] 08/47: An attempt at server-side changes

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 31c64a0cc138146dc59c1ce665f3ca72fd7b52f9
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 17:19:24 2020 -0800

    An attempt at server-side changes
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 22 +++++++++++++---------
 .../org/apache/pinot/spi/stream/FetchResult.java   |  5 +----
 .../pinot/spi/stream/PartitionGroupConsumer.java   |  2 +-
 3 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0938251..054676e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -71,8 +71,10 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.FetchResult;
 import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.RowMetadata;
@@ -249,10 +251,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private Thread _consumerThread;
   private final String _streamTopic;
   private final int _partitionGroupId;
+  private final PartitionGroupMetadata _partitionGroupMetadata;
   final String _clientId;
   private final LLCSegmentName _llcSegmentName;
   private final RecordTransformer _recordTransformer;
-  private PartitionLevelConsumer _partitionLevelConsumer = null;
+  private PartitionGroupConsumer _partitionGroupConsumer = null;
   private StreamMetadataProvider _streamMetadataProvider = null;
   private final File _resourceTmpDir;
   private final String _tableNameWithType;
@@ -381,12 +384,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
-        messageBatch = _partitionLevelConsumer
+        FetchResult fetchResult = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+        messageBatch = fetchResult.getMessages();
         consecutiveErrorCount = 0;
-      } catch (TimeoutException e) {
-        handleTransientStreamErrors(e);
-        continue;
       } catch (TransientConsumerException e) {
         handleTransientStreamErrors(e);
         continue;
@@ -899,7 +900,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   private void closePartitionLevelConsumer() {
     try {
-      _partitionLevelConsumer.close();
+      _partitionGroupConsumer.close();
     } catch (Exception e) {
       segmentLogger.warn("Could not close stream consumer", e);
     }
@@ -1131,6 +1132,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
     _partitionGroupId = _llcSegmentName.getPartitionGroupId();
+    _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, _llcSegmentName.getSequenceNumber(),
+        _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(),
+        _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
@@ -1311,11 +1315,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
    * @param reason
    */
   private void makeStreamConsumer(String reason) {
-    if (_partitionLevelConsumer != null) {
+    if (_partitionGroupConsumer != null) {
       closePartitionLevelConsumer();
     }
     segmentLogger.info("Creating new stream consumer, reason: {}", reason);
-    _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _partitionGroupId);
+    _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata);
   }
 
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
index b0ed6e5..7e8a911 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
@@ -18,10 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
-import java.util.List;
-
-
 public interface FetchResult<T> {
   Checkpoint getLastCheckpoint();
-  List<T> getMessages();
+  MessageBatch<T> getMessages();
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index e096e67..bbbdaad 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -22,5 +22,5 @@ import java.io.Closeable;
 
 
 public interface PartitionGroupConsumer extends Closeable {
-  FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+  FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org