You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2020/05/11 20:35:53 UTC

[incubator-pinot] branch master updated: Introduced StreamPartitionMsgOffset class (#5360)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f686218  Introduced StreamPartitionMsgOffset class (#5360)
f686218 is described below

commit f686218fcbea46e6d4e21834980c01dbdfcf837e
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Mon May 11 13:35:42 2020 -0700

    Introduced StreamPartitionMsgOffset class (#5360)
    
    This change is mostly mechanical to use the new offset class (instead of a long offset)
    on the server side. Segment completion protocol and segment zk metadata still hold the
    offset in long format.
    
    The StreamPartitionMsgOffset should eventually become an interface. For now, we just
    hold a long value there.
---
 .../manager/realtime/DefaultSegmentCommitter.java  |  2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     | 70 +++++++++++-----------
 .../realtime/SegmentBuildTimeLeaseExtender.java    |  5 +-
 .../data/manager/realtime/SegmentCommitter.java    |  4 +-
 .../manager/realtime/SplitSegmentCommitter.java    |  2 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java | 27 +++++++--
 .../DefaultCommitterRealtimeIntegrationTest.java   |  6 +-
 .../pinot/spi/stream/StreamPartitionMsgOffset.java | 68 +++++++++++++++++++++
 8 files changed, 136 insertions(+), 48 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java
index 67d9214..4b569ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/DefaultSegmentCommitter.java
@@ -41,7 +41,7 @@ public class DefaultSegmentCommitter implements SegmentCommitter {
   }
 
   @Override
-  public SegmentCompletionProtocol.Response commit(long currentOffset, int numRows, LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) {
+  public SegmentCompletionProtocol.Response commit(LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) {
     final File segmentTarFile = new File(segmentBuildDescriptor.getSegmentTarFilePath());
 
     SegmentCompletionProtocol.Response response = _protocolHandler.segmentCommit(_params, segmentTarFile);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 033a220..0daae74 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -75,6 +75,7 @@ import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.TransientConsumerException;
 import org.apache.pinot.core.util.SchemaUtils;
 import org.joda.time.DateTime;
@@ -142,24 +143,24 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   public class SegmentBuildDescriptor {
     final String _segmentTarFilePath;
     final Map<String, File> _metadataFileMap;
-    final long _offset;
+    final StreamPartitionMsgOffset _offset;
     final long _waitTimeMillis;
     final long _buildTimeMillis;
     final String _segmentDirPath;
     final long _segmentSizeBytes;
 
-    public SegmentBuildDescriptor(String segmentTarFilePath, Map<String, File> metadataFileMap, long offset,
+    public SegmentBuildDescriptor(String segmentTarFilePath, Map<String, File> metadataFileMap, StreamPartitionMsgOffset offset,
         String segmentDirPath, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
       _segmentTarFilePath = segmentTarFilePath;
       _metadataFileMap = metadataFileMap;
-      _offset = offset;
+      _offset = new StreamPartitionMsgOffset(offset.getOffset());
       _buildTimeMillis = buildTimeMillis;
       _waitTimeMillis = waitTimeMillis;
       _segmentDirPath = segmentDirPath;
       _segmentSizeBytes = segmentSizeBytes;
     }
 
-    public long getOffset() {
+    public StreamPartitionMsgOffset getOffset() {
       return _offset;
     }
 
@@ -217,7 +218,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final String _metricKeyName;
   private final ServerMetrics _serverMetrics;
   private final MutableSegmentImpl _realtimeSegment;
-  private volatile long _currentOffset;
+  private final StreamPartitionMsgOffset _currentOffset;
   private volatile State _state;
   private volatile int _numRowsConsumed = 0;
   private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled.
@@ -232,7 +233,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
-  private volatile long _finalOffset = -1;
+  private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one
   private volatile boolean _shouldStop = false;
 
   // It takes 30s to locate controller leader, and more if there are multiple controller failures.
@@ -261,7 +262,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
   private final long _consumeStartTime;
-  private final long _startOffset;
+  private final StreamPartitionMsgOffset _startOffset;
   private final PartitionLevelStreamConfig _partitionLevelStreamConfig;
 
   private long _lastLogTime = 0;
@@ -305,11 +306,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // We have posted segmentConsumed() at least once, and the controller is asking us to catch up to a certain offset.
         // There is no time limit here, so just check to see that we are still within the offset we need to reach.
         // Going past the offset is an exception.
-        if (_currentOffset == _finalOffset) {
+        if (_currentOffset.compareTo(_finalOffset) == 0) {
           segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString());
           return true;
         }
-        if (_currentOffset > _finalOffset) {
+        if (_currentOffset.compareTo(_finalOffset) > 0) {
           segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
               _finalOffset);
           throw new RuntimeException("Past max offset");
@@ -320,14 +321,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // We are attempting to go from CONSUMING to ONLINE state. We are making a last attempt to catch up to the
         // target offset. We have a time constraint, and need to stop consuming if we cannot get to the target offset
         // within that time.
-        if (_currentOffset == _finalOffset) {
+        if (_currentOffset.compareTo(_finalOffset) == 0) {
           segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString());
           return true;
         } else if (now >= _consumeEndTime) {
           segmentLogger.info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString());
           return true;
         }
-        if (_currentOffset > _finalOffset) {
+        if (_currentOffset.compareTo(_finalOffset) > 0) {
           segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
               _finalOffset);
           throw new RuntimeException("Past max offset");
@@ -360,13 +361,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final long idlePipeSleepTimeMillis = 100;
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
-    long lastUpdatedOffset = _currentOffset;  // so that we always update the metric when we enter this method.
+    StreamPartitionMsgOffset lastUpdatedOffset = new StreamPartitionMsgOffset(_currentOffset.getOffset());  // so that we always update the metric when we enter this method.
     long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
     // anymore. Remove the file if it exists.
     removeSegmentFile();
 
-    final long _endOffset = Long.MAX_VALUE; // No upper limit on stream offset
+    final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(Long.MAX_VALUE); // No upper limit on stream offset
     segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset, _finalOffset);
     while (!_shouldStop && !endCriteriaReached()) {
       // Consume for the next readTime ms, or we get to final offset, whichever happens earlier,
@@ -374,7 +375,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       MessageBatch messageBatch;
       try {
         messageBatch = _partitionLevelConsumer
-            .fetchMessages(_currentOffset, _endOffset, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+            .fetchMessages(_currentOffset.getOffset(), endOffset.getOffset(), _partitionLevelStreamConfig.getFetchTimeoutMillis());
         consecutiveErrorCount = 0;
       } catch (TimeoutException e) {
         handleTransientStreamErrors(e);
@@ -394,13 +395,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
       processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
 
-      if (_currentOffset != lastUpdatedOffset) {
+      if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
         consecutiveIdleCount = 0;
         // We consumed something. Update the highest stream offset as well as partition-consuming metric.
-        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset);
-        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset);
+        // TODO Need to find a way to bump metrics without getting actual offset value.
+        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset.getOffset());
+        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset.getOffset());
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
-        lastUpdatedOffset = _currentOffset;
+        lastUpdatedOffset.setOffset(_currentOffset.getOffset());
       } else {
         // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long time.
         // Create a new stream consumer wrapper, in case we are stuck on something.
@@ -487,7 +489,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                 realtimeRowsDroppedMeter);
       }
 
-      _currentOffset = messagesAndOffsets.getNextStreamMessageOffsetAtIndex(index);
+      _currentOffset.setOffset(messagesAndOffsets.getNextStreamMessageOffsetAtIndex(index));
       _numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
       _numRowsConsumed++;
       streamMessageCount++;
@@ -534,7 +536,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           _state = State.HOLDING;
           SegmentCompletionProtocol.Response response = postSegmentConsumedMsg();
           SegmentCompletionProtocol.ControllerResponseStatus status = response.getStatus();
-          long rspOffset = response.getOffset();
+          StreamPartitionMsgOffset rspOffset = new StreamPartitionMsgOffset(response.getOffset());
           boolean success;
           switch (status) {
             case NOT_LEADER:
@@ -543,7 +545,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               hold();
               break;
             case CATCH_UP:
-              if (rspOffset <= _currentOffset) {
+              if (rspOffset.compareTo(_currentOffset) <= 0) {
                 // Something wrong with the controller. Back off and try again.
                 segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", rspOffset,
                     _currentOffset);
@@ -647,7 +649,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // built the segment successfully.
   protected void buildSegmentForCommit(long buildTimeLeaseMs) {
     try {
-      if (_segmentBuildDescriptor != null && _segmentBuildDescriptor.getOffset() == _currentOffset) {
+      if (_segmentBuildDescriptor != null && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) {
         // Double-check that we have the file, just in case.
         String segTarFile = _segmentBuildDescriptor.getSegmentTarFilePath();
         if (new File(segTarFile).exists()) {
@@ -673,7 +675,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   @VisibleForTesting
-  protected long getCurrentOffset() {
+  protected StreamPartitionMsgOffset getCurrentOffset() {
     return _currentOffset;
   }
 
@@ -804,7 +806,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) {
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
 
-    params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
+    params.withSegmentName(_segmentNameStr).withOffset(_currentOffset.getOffset()).withNumRows(_numRowsConsumed)
         .withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
         .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
         .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
@@ -831,7 +833,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       segmentCommitter = _segmentCommitterFactory.createDefaultSegmentCommitter(params);
     }
 
-    return segmentCommitter.commit(_currentOffset, _numRowsConsumed, _segmentBuildDescriptor);
+    return segmentCommitter.commit(_segmentBuildDescriptor);
   }
 
   protected boolean buildSegmentAndReplace() {
@@ -880,7 +882,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected void postStopConsumedMsg(String reason) {
     do {
       SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-      params.withOffset(_currentOffset).withReason(reason).withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
+      params.withOffset(_currentOffset.getOffset()).withReason(reason).withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
 
       SegmentCompletionProtocol.Response response = _protocolHandler.segmentStoppedConsuming(params);
       if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
@@ -896,7 +898,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Post segmentConsumed to current leader.
     // Retry maybe once if leader is not found.
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-    params.withOffset(_currentOffset).withSegmentName(_segmentNameStr).withReason(_stopReason)
+    params.withOffset(_currentOffset.getOffset()).withSegmentName(_segmentNameStr).withReason(_stopReason)
         .withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
@@ -919,7 +921,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // Remove the segment file before we do anything else.
       removeSegmentFile();
       _leaseExtender.removeSegment(_segmentNameStr);
-      final long endOffset = llcMetadata.getEndOffset();
+      final StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(llcMetadata.getEndOffset());
       segmentLogger
           .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
               _startOffset, endOffset);
@@ -949,13 +951,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               break;
             case DEFAULT:
               // Allow to catch up upto final offset, and then replace.
-              if (_currentOffset > endOffset) {
+              if (_currentOffset.compareTo(endOffset) > 0) {
                 // We moved ahead of the offset that is committed in ZK.
                 segmentLogger
                     .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", _currentOffset,
                         endOffset);
                 downloadSegmentAndReplace(llcMetadata);
-              } else if (_currentOffset == endOffset) {
+              } else if (_currentOffset.compareTo(endOffset) == 0) {
                 segmentLogger
                     .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset);
                 buildSegmentAndReplace();
@@ -996,7 +998,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     return System.currentTimeMillis();
   }
 
-  private boolean catchupToFinalOffset(long endOffset, long timeoutMs) {
+  private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long timeoutMs) {
     _finalOffset = endOffset;
     _consumeEndTime = now() + timeoutMs;
     _state = State.CONSUMING_TO_ONLINE;
@@ -1010,7 +1012,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     } finally {
       _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
     }
-    if (_currentOffset != endOffset) {
+    if (_currentOffset.compareTo(endOffset) != 0) {
       // Timeout?
       segmentLogger.error("Could not consume up to {} (current offset {})", endOffset, _currentOffset);
       return false;
@@ -1204,8 +1206,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
 
     _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
-    _startOffset = _segmentZKMetadata.getStartOffset();
-    _currentOffset = _startOffset;
+    _startOffset = new StreamPartitionMsgOffset(_segmentZKMetadata.getStartOffset());
+    _currentOffset = new StreamPartitionMsgOffset(_startOffset.getOffset());
     _resourceTmpDir = new File(resourceDataDir, "_tmp");
     if (!_resourceTmpDir.exists()) {
       _resourceTmpDir.mkdirs();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index fcd6422..d91488e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,10 +91,10 @@ public class SegmentBuildTimeLeaseExtender {
    * @param initialBuildTimeMs is the initial time budget that SegmentCompletionManager has allocated.
    * @param offset The offset at which this segment is being built.
    */
-  public void addSegment(String segmentId, long initialBuildTimeMs, long offset) {
+  public void addSegment(String segmentId, long initialBuildTimeMs, StreamPartitionMsgOffset offset) {
     final long initialDelayMs = initialBuildTimeMs * 9 / 10;
     final SegmentCompletionProtocol.Request.Params reqParams = new SegmentCompletionProtocol.Request.Params();
-    reqParams.withOffset(offset).withSegmentName(segmentId).withExtraTimeSec(EXTRA_TIME_SECONDS)
+    reqParams.withOffset(offset.getOffset()).withSegmentName(segmentId).withExtraTimeSec(EXTRA_TIME_SECONDS)
         .withInstanceId(_instanceId);
     Future future = _executor
         .scheduleWithFixedDelay(new LeaseExtender(reqParams), initialDelayMs, REPEAT_REQUEST_PERIOD_SEC * 1000L,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
index c298496..922c54b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
@@ -27,10 +27,8 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 public interface SegmentCommitter {
   /**
    * Commits a realtime segment to persistent store
-   * @param currentOffset current offset in the stream
-   * @param numRows num rows consumed in the segment being committed
    * @param segmentBuildDescriptor object that describes segment to be committed
    * @return
    */
-  SegmentCompletionProtocol.Response commit(long currentOffset, int numRows, LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor);
+  SegmentCompletionProtocol.Response commit(LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 53bc82d..608a066 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -45,7 +45,7 @@ public class SplitSegmentCommitter implements SegmentCommitter {
   }
 
   @Override
-  public SegmentCompletionProtocol.Response commit(long currentOffset, int numRowsConsumed, LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) {
+  public SegmentCompletionProtocol.Response commit(LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor) {
     final File segmentTarFile = new File(segmentBuildDescriptor.getSegmentTarFilePath());
 
     SegmentCompletionProtocol.Response segmentCommitStartResponse = _protocolHandler.segmentCommitStart(_params);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 4e41c3f..a682c85 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -57,8 +58,7 @@ import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-
-// TODO Write more tests for other parts of the class
+// TODO Re-write this test using the stream abstraction
 public class LLRealtimeSegmentDataManagerTest {
   private static final String _segmentDir = "/tmp/" + LLRealtimeSegmentDataManagerTest.class.getSimpleName();
   private static final File _segmentDirFile = new File(_segmentDir);
@@ -862,7 +862,7 @@ public class LLRealtimeSegmentDataManagerTest {
     }
 
     public void setCurrentOffset(long offset) {
-      setLong(offset, "_currentOffset");
+      setOffset(offset, "_currentOffset");
     }
 
     public void setConsumeEndTime(long endTime) {
@@ -878,7 +878,7 @@ public class LLRealtimeSegmentDataManagerTest {
     }
 
     public void setFinalOffset(long offset) {
-      setLong(offset, "_finalOffset");
+      setOffset(offset, "_finalOffset");
     }
 
     public boolean invokeEndCriteriaReached() {
@@ -914,7 +914,24 @@ public class LLRealtimeSegmentDataManagerTest {
       }
     }
 
-    private void setInt(int value, String fieldName) {
+    private void setOffset(long value, String fieldName) {
+      try {
+        Field field = LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        StreamPartitionMsgOffset offset = (StreamPartitionMsgOffset)field.get(this);
+        if (offset == null) {
+          field.set(this, new StreamPartitionMsgOffset(value));
+        } else {
+          offset.setOffset(value);
+        }
+      } catch (NoSuchFieldException e) {
+        Assert.fail();
+      } catch (IllegalAccessException e) {
+        Assert.fail();
+      }
+  }
+
+  private void setInt(int value, String fieldName) {
       try {
         Field field = LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName);
         field.setAccessible(true);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
index c2d4101..ea12bc5 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -116,9 +117,10 @@ public class DefaultCommitterRealtimeIntegrationTest extends RealtimeClusterInte
     TarGzCompressionUtils.createTarGzOfDirectory(_realtimeSegmentUntarred.getAbsolutePath());
 
     // SegmentBuildDescriptor is currently not a static class, so we will mock this object.
+    StreamPartitionMsgOffset endOffset = new StreamPartitionMsgOffset(END_OFFSET);
     when(segmentBuildDescriptor.getSegmentTarFilePath()).thenReturn(_realtimeSegmentUntarred + TARGZ_SUFFIX);
     when(segmentBuildDescriptor.getBuildTimeMillis()).thenReturn(0L);
-    when(segmentBuildDescriptor.getOffset()).thenReturn(END_OFFSET);
+    when(segmentBuildDescriptor.getOffset()).thenReturn(endOffset);
     when(segmentBuildDescriptor.getSegmentSizeBytes()).thenReturn(0L);
     when(segmentBuildDescriptor.getWaitTimeMillis()).thenReturn(0L);
 
@@ -134,7 +136,7 @@ public class DefaultCommitterRealtimeIntegrationTest extends RealtimeClusterInte
 
     SegmentCommitterFactory segmentCommitterFactory = new SegmentCommitterFactory(LOGGER, protocolHandler);
     SegmentCommitter segmentCommitter = segmentCommitterFactory.createDefaultSegmentCommitter(params);
-    segmentCommitter.commit(END_OFFSET, 3, segmentBuildDescriptor);
+    segmentCommitter.commit(segmentBuildDescriptor);
   }
 
   public void buildSegment()
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
new file mode 100644
index 0000000..e571b91
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import org.apache.pinot.spi.annotations.InterfaceStability;
+
+/**
+ * This is a class for now (so we can make one round of changes to the code)
+ * It will evolve to an interface later with different implementations for the
+ * streams. Each stream needs to provide its own serde of an offset.
+ *
+ * For now, we will take toString as a serializer.
+ * Must be thread-safe for multiple readers and one writer. Readers could get the offset
+ * and the writer can update it.
+ */
+@InterfaceStability.Evolving
+public class StreamPartitionMsgOffset implements Comparable {
+  private volatile long _offset;
+
+  /**
+   * This constructor will go away when this becomes an interface.
+   * @param offset
+   */
+  @Deprecated
+  public StreamPartitionMsgOffset(long offset) {
+    _offset = offset;
+  }
+
+  @Deprecated
+  public void setOffset(long offset) {
+    _offset = offset;
+  }
+
+  @Override
+  public int compareTo(Object other) {
+    return Long.compare(_offset, ((StreamPartitionMsgOffset)other)._offset);
+  }
+
+  @Override
+  public String toString() {
+    return Long.toString(_offset);
+  }
+
+  /**
+   * Once we fix the protocol to use serialzied offsets, this should go away.
+   * @return
+   */
+  @Deprecated
+  public long getOffset() {
+    return _offset;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org