You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/11/25 19:14:55 UTC

[pinot] branch pub-sub created (now a485eaf)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a change to branch pub-sub
in repository https://gitbox.apache.org/repos/asf/pinot.git.


      at a485eaf  Interface changes needed to support pub-sub

This branch includes the following new commits:

     new a485eaf  Interface changes needed to support pub-sub

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Interface changes needed to support pub-sub

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch pub-sub
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit a485eaf0a462a6003d7313d4417a1ad17311b63f
Author: kishoreg <g....@gmail.com>
AuthorDate: Thu Nov 25 09:14:05 2021 -1000

    Interface changes needed to support pub-sub
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 341 ++++++++++++++-------
 .../pinot/spi/stream/PartitionGroupConsumer.java   |  19 +-
 2 files changed, 247 insertions(+), 113 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index a36935d..a7dc545 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -94,11 +94,12 @@ import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion.
+ * Segment data manager for low level consumer realtime segments, which manages consumption and
+ * segment completion.
  */
 public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
+
   protected enum State {
     // The state machine starts off with this state. While in this state we consume stream events
     // and index them in memory. We continue to be in this state until the end criteria is satisfied
@@ -140,11 +141,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     ERROR;
 
     public boolean shouldConsume() {
-      return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this.equals(CONSUMING_TO_ONLINE);
+      return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this
+          .equals(CONSUMING_TO_ONLINE);
     }
 
     public boolean isFinal() {
-      return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this.equals(DISCARDED);
+      return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this
+          .equals(DISCARDED);
     }
   }
 
@@ -152,6 +155,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   @VisibleForTesting
   public class SegmentBuildDescriptor {
+
     final File _segmentTarFile;
     final Map<String, File> _metadataFileMap;
     final StreamPartitionMsgOffset _offset;
@@ -159,8 +163,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final long _buildTimeMillis;
     final long _segmentSizeBytes;
 
-    public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable Map<String, File> metadataFileMap,
-        StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
+    public SegmentBuildDescriptor(@Nullable File segmentTarFile,
+        @Nullable Map<String, File> metadataFileMap,
+        StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis,
+        long segmentSizeBytes) {
       _segmentTarFile = segmentTarFile;
       _metadataFileMap = metadataFileMap;
       _offset = _streamPartitionMsgOffsetFactory.create(offset);
@@ -301,23 +307,27 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // the max time we are allowed to consume.
         if (now >= _consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
-            _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            _segmentLogger.info("No events came in, extending time by {} hours",
+                TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             return false;
           }
           _segmentLogger
-              .info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}",
+              .info(
+                  "Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}",
                   _startTimeMs, now, _numRowsConsumed, _numRowsIndexed);
           _stopReason = SegmentCompletionProtocol.REASON_TIME_LIMIT;
           return true;
         } else if (_numRowsIndexed >= _segmentMaxRowCount) {
-          _segmentLogger.info("Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}",
+          _segmentLogger.info(
+              "Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}",
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
         } else if (_endOfPartitionGroup) {
-          _segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, "
-              + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+          _segmentLogger.info(
+              "Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, "
+                  + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
           return true;
         }
@@ -334,7 +344,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           return true;
         }
         if (_currentOffset.compareTo(_finalOffset) > 0) {
-          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
+          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(),
+              _currentOffset,
               _finalOffset);
           throw new RuntimeException("Past max offset");
         }
@@ -348,11 +359,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           _segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString());
           return true;
         } else if (now >= _consumeEndTime) {
-          _segmentLogger.info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString());
+          _segmentLogger
+              .info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString());
           return true;
         }
         if (_currentOffset.compareTo(_finalOffset) > 0) {
-          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
+          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(),
+              _currentOffset,
               _finalOffset);
           throw new RuntimeException("Past max offset");
         }
@@ -367,12 +380,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       throws Exception {
     _consecutiveErrorCount++;
     if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) {
-      _segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts",
+      _segmentLogger.warn(
+          "Stream transient exception when fetching messages, stopping consumption after {} attempts",
           _consecutiveErrorCount, e);
       throw e;
     } else {
       _segmentLogger
-          .warn("Stream transient exception when fetching messages, retrying (count={})", _consecutiveErrorCount, e);
+          .warn("Stream transient exception when fetching messages, retrying (count={})",
+              _consecutiveErrorCount, e);
       Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
       makeStreamConsumer("Too many transient errors");
     }
@@ -382,8 +397,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       throws Exception {
     _numRowsErrored = 0;
     final long idlePipeSleepTimeMillis = 100;
-    final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
-        .getFetchTimeoutMillis());  // 3 minute count
+    final long maxIdleCountBeforeStatUpdate =
+        (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
+            .getFetchTimeoutMillis());  // 3 minute count
     StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory
         .create(_currentOffset);  // so that we always update the metric when we enter this method.
     long consecutiveIdleCount = 0;
@@ -391,14 +407,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // anymore. Remove the file if it exists.
     removeSegmentFile();
 
-    _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset, _finalOffset);
+    _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset,
+        _finalOffset);
     while (!_shouldStop && !endCriteriaReached()) {
       // Consume for the next readTime ms, or we get to final offset, whichever happens earlier,
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
         messageBatch = _partitionGroupConsumer
-            .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+            .fetchMessages(_currentOffset, null,
+                _partitionLevelStreamConfig.getFetchTimeoutMillis());
         _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         _consecutiveErrorCount = 0;
       } catch (TimeoutException e) {
@@ -408,7 +426,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         handleTransientStreamErrors(e);
         continue;
       } catch (PermanentConsumerException e) {
-        _segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e);
+        _segmentLogger
+            .warn("Permanent exception from stream when fetching messages, stopping consumption",
+                e);
         throw e;
       } catch (Exception e) {
         // Unknown exception from stream. Treat as a transient exception.
@@ -435,7 +455,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // Create a new stream consumer wrapper, in case we are stuck on something.
         consecutiveIdleCount++;
         if (consecutiveIdleCount > maxIdleCountBeforeStatUpdate) {
-          _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
+          _serverMetrics
+              .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
           consecutiveIdleCount = 0;
           makeStreamConsumer("Idle for too long");
         }
@@ -443,8 +464,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
 
     if (_numRowsErrored > 0) {
-      _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
-      _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+      _serverMetrics
+          .addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+      _serverMetrics
+          .addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
     }
     return true;
   }
@@ -479,7 +502,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         //    Throw an exception.
         //
         _segmentLogger
-            .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed, _numRowsIndexed,
+            .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed,
+                _numRowsIndexed,
                 _segmentMaxRowCount);
         throw new RuntimeException("Realtime segment full");
       }
@@ -491,7 +515,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
 
       GenericRow decodedRow = _messageDecoder
-          .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
+          .decode(messagesAndOffsets.getMessageAtIndex(index),
+              messagesAndOffsets.getMessageOffsetAtIndex(index),
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
         try {
@@ -500,7 +525,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             decodedRow = _complexTypeTransformer.transform(decodedRow);
           }
           if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-            for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+            for (Object singleRow : (Collection) decodedRow
+                .getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
               GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
               if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
                 realtimeRowsConsumedMeter = _serverMetrics
@@ -510,7 +536,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                 canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
               } else {
                 realtimeRowsDroppedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                    .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
+                        1,
                         realtimeRowsDroppedMeter);
               }
             }
@@ -524,16 +551,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
             } else {
               realtimeRowsDroppedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                  .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
+                      1,
                       realtimeRowsDroppedMeter);
             }
           }
         } catch (Exception e) {
-          String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
+          String errorMessage = String
+              .format("Caught exception while transforming the record: %s", decodedRow);
           _segmentLogger.error(errorMessage, e);
           _numRowsErrored++;
           _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+              .addSegmentError(_segmentNameStr,
+                  new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
         }
       } else {
         realtimeRowsDroppedMeter = _serverMetrics
@@ -548,7 +578,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     updateCurrentDocumentCountMetrics();
     if (streamMessageCount != 0) {
-      _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount,
+      _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
+          indexedMessageCount,
           streamMessageCount, _currentOffset);
     } else {
       // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
@@ -557,6 +588,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   public class PartitionConsumer implements Runnable {
+
     public void run() {
       long initialConsumptionEnd = 0L;
       long lastCatchUpStart = 0L;
@@ -567,7 +599,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           if (_state.shouldConsume()) {
             consumeLoop();  // Consume until we reached the end criteria, or we are stopped.
           }
-          _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+          _serverMetrics
+              .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
           if (_shouldStop) {
             break;
           }
@@ -580,7 +613,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           } else if (_state == State.CATCHING_UP) {
             catchUpTimeMillis += now() - lastCatchUpStart;
             _serverMetrics
-                .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
+                .setValueOfTableGauge(_metricKeyName,
+                    ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
                     TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
           }
 
@@ -599,8 +633,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             case CATCH_UP:
               if (rspOffset.compareTo(_currentOffset) <= 0) {
                 // Something wrong with the controller. Back off and try again.
-                _segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", rspOffset,
-                    _currentOffset);
+                _segmentLogger
+                    .error("Invalid catchup offset {} in controller response, current offset {}",
+                        rspOffset,
+                        _currentOffset);
                 hold();
               } else {
                 _state = State.CATCHING_UP;
@@ -631,7 +667,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                     // Could not build segment for some reason. We can only download it.
                     _state = State.ERROR;
                     _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-                        new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null));
+                        new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment",
+                            null));
                   }
                   break;
                 default:
@@ -640,13 +677,21 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               break;
             case COMMIT:
               _state = State.COMMITTING;
+              StreamPartitionMsgOffset endOffset = _partitionGroupConsumer.commit(_currentOffset);
+              if (endOffset != null) {
+                _currentOffset = endOffset;
+              }
+
               long buildTimeSeconds = response.getBuildTimeSeconds();
+
               buildSegmentForCommit(buildTimeSeconds * 1000L);
+              boolean commitSuccess = false;
               if (_segmentBuildDescriptor == null) {
                 // We could not build the segment. Go into error state.
                 _state = State.ERROR;
                 _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-                    new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null));
+                    new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment",
+                        null));
               } else {
                 success = commitSegment(response.getControllerVipUrl(),
                     response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit());
@@ -660,9 +705,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                   hold();
                 }
               }
+              if (_state != State.COMMITTED) {
+                _partitionGroupConsumer.rollback();
+              }
+
               break;
             default:
-              _segmentLogger.error("Holding after response from Controller: {}", response.toJsonString());
+              _segmentLogger
+                  .error("Holding after response from Controller: {}", response.toJsonString());
               hold();
               break;
           }
@@ -673,7 +723,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         postStopConsumedMsg(e.getClass().getName());
         _state = State.ERROR;
         _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+            .addSegmentError(_segmentNameStr,
+                new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
         return;
       }
@@ -682,7 +733,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
       if (initialConsumptionEnd != 0L) {
         _serverMetrics
-            .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
+            .setValueOfTableGauge(_metricKeyName,
+                ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
                 TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd));
       }
       // There is a race condition that the destroy() method can be called which ends up calling stop on the consumer.
@@ -703,7 +755,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private CompletionMode getSegmentCompletionMode() {
     CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig();
     if (completionConfig != null) {
-      if (CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())) {
+      if (CompletionMode.DOWNLOAD.toString()
+          .equalsIgnoreCase(completionConfig.getCompletionMode())) {
         return CompletionMode.DOWNLOAD;
       }
     }
@@ -724,7 +777,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // built the segment successfully.
   protected void buildSegmentForCommit(long buildTimeLeaseMs) {
     try {
-      if (_segmentBuildDescriptor != null && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) {
+      if (_segmentBuildDescriptor != null
+          && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) {
         // Double-check that we have the file, just in case.
         File segmentTarFile = _segmentBuildDescriptor.getSegmentTarFile();
         if (segmentTarFile != null && segmentTarFile.exists()) {
@@ -734,7 +788,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       removeSegmentFile();
       if (buildTimeLeaseMs <= 0) {
         if (_segBuildSemaphore == null) {
-          buildTimeLeaseMs = SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L;
+          buildTimeLeaseMs =
+              SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L;
         } else {
           // We know we are going to use a semaphore to limit number of segment builds, and could be
           // blocked for a long time. The controller has not provided a lease time, so set one to
@@ -806,7 +861,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + _segmentNameStr + "-" + now());
       // lets convert the segment now
       RealtimeSegmentConverter converter =
-          new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
+          new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(),
+              _schema,
               _tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(), _sortedColumn,
               _invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _noDictionaryColumns,
               _varLengthDictionaryColumns, _nullHandlingEnabled);
@@ -821,7 +877,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       final long buildTimeMillis = now() - lockAcquireTimeMillis;
       final long waitTimeMillis = lockAcquireTimeMillis - startTimeMillis;
       _segmentLogger
-          .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis, waitTimeMillis);
+          .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis,
+              waitTimeMillis);
 
       File dataDir = new File(_resourceDataDir);
       File indexDir = new File(dataDir, _segmentNameStr);
@@ -834,55 +891,67 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         FileUtils.moveDirectory(tempIndexDir, indexDir);
       } catch (IOException e) {
         String errorMessage =
-            String.format("Caught exception while moving index directory from: %s to: %s", tempIndexDir, indexDir);
+            String.format("Caught exception while moving index directory from: %s to: %s",
+                tempIndexDir, indexDir);
         _segmentLogger.error(errorMessage, e);
         _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+            .addSegmentError(_segmentNameStr,
+                new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
         return null;
       } finally {
         FileUtils.deleteQuietly(tempSegmentFolder);
       }
 
       long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir);
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
+      _serverMetrics.setValueOfTableGauge(_metricKeyName,
+          ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
           TimeUnit.MILLISECONDS.toSeconds(buildTimeMillis));
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
+      _serverMetrics.setValueOfTableGauge(_metricKeyName,
+          ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
           TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
 
       if (forCommit) {
-        File segmentTarFile = new File(dataDir, _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        File segmentTarFile = new File(dataDir,
+            _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
         try {
           TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
         } catch (IOException e) {
           String errorMessage =
-              String.format("Caught exception while taring index directory from: %s to: %s", indexDir, segmentTarFile);
+              String
+                  .format("Caught exception while taring index directory from: %s to: %s", indexDir,
+                      segmentTarFile);
           _segmentLogger.error(errorMessage, e);
           _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+              .addSegmentError(_segmentNameStr,
+                  new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
           return null;
         }
 
         File metadataFile = SegmentDirectoryPaths.findMetadataFile(indexDir);
         if (metadataFile == null) {
           _segmentLogger
-              .error("Failed to find file: {} under index directory: {}", V1Constants.MetadataKeys.METADATA_FILE_NAME,
+              .error("Failed to find file: {} under index directory: {}",
+                  V1Constants.MetadataKeys.METADATA_FILE_NAME,
                   indexDir);
           return null;
         }
         File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
         if (creationMetaFile == null) {
           _segmentLogger
-              .error("Failed to find file: {} under index directory: {}", V1Constants.SEGMENT_CREATION_META, indexDir);
+              .error("Failed to find file: {} under index directory: {}",
+                  V1Constants.SEGMENT_CREATION_META, indexDir);
           return null;
         }
         Map<String, File> metadataFiles = new HashMap<>();
         metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFile);
         metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile);
 
-        return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset, buildTimeMillis,
+        return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset,
+            buildTimeMillis,
             waitTimeMillis, segmentSizeBytes);
       } else {
-        return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis, waitTimeMillis,
+        return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis,
+            waitTimeMillis,
             segmentSizeBytes);
       }
     } catch (InterruptedException e) {
@@ -904,7 +973,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl, isSplitCommit);
 
-    if (!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
+    if (!commitResponse.getStatus()
+        .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
       return false;
     }
 
@@ -913,7 +983,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     return true;
   }
 
-  protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) {
+  protected SegmentCompletionProtocol.Response commit(String controllerVipUrl,
+      boolean isSplitCommit) {
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
 
     params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
@@ -927,7 +998,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
     SegmentCommitter segmentCommitter;
     try {
-      segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
+      segmentCommitter = _segmentCommitterFactory
+          .createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
     } catch (URISyntaxException e) {
       _segmentLogger.error("Failed to create a segment committer: ", e);
       return SegmentCompletionProtocol.RESP_NOT_SENT;
@@ -971,12 +1043,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   /**
    * Cleans up the metrics that reflects the state of the realtime segment.
-   * This step is essential as the instance may not be the target location for some of the partitions.
-   * E.g. if the number of partitions increases, or a host swap is needed, the target location for some partitions
+   * This step is essential as the instance may not be the target location for some of the
+   * partitions.
+   * E.g. if the number of partitions increases, or a host swap is needed, the target location for
+   * some partitions
    * may change,
-   * and the current host remains to run. In this case, the current server would still keep the state of the old
+   * and the current host remains to run. In this case, the current server would still keep the
+   * state of the old
    * partitions,
-   * which no longer resides in this host any more, thus causes false positive information to the metric system.
+   * which no longer resides in this host any more, thus causes false positive information to the
+   * metric system.
    */
   private void cleanupMetrics() {
     _serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING);
@@ -994,10 +1070,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected void postStopConsumedMsg(String reason) {
     do {
       SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-      params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason).withSegmentName(_segmentNameStr)
+      params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason)
+          .withSegmentName(_segmentNameStr)
           .withInstanceId(_instanceId);
 
-      SegmentCompletionProtocol.Response response = _protocolHandler.segmentStoppedConsuming(params);
+      SegmentCompletionProtocol.Response response = _protocolHandler
+          .segmentStoppedConsuming(params);
       if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
         _segmentLogger.info("Got response {}", response.toJsonString());
         break;
@@ -1036,7 +1114,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       final StreamPartitionMsgOffset endOffset =
           _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
       _segmentLogger
-          .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
+          .info(
+              "State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})",
+              _state.toString(),
               _startOffset, endOffset);
       stop();
       _segmentLogger.info("Consumer thread stopped in state {}", _state.toString());
@@ -1058,8 +1138,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           CompletionMode segmentCompletionMode = getSegmentCompletionMode();
           switch (segmentCompletionMode) {
             case DOWNLOAD:
-              _segmentLogger.info("State {}. CompletionMode {}. Downloading to replace", _state.toString(),
-                  segmentCompletionMode);
+              _segmentLogger
+                  .info("State {}. CompletionMode {}. Downloading to replace", _state.toString(),
+                      segmentCompletionMode);
               downloadSegmentAndReplace(segmentZKMetadata);
               break;
             case DEFAULT:
@@ -1067,23 +1148,28 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               if (_currentOffset.compareTo(endOffset) > 0) {
                 // We moved ahead of the offset that is committed in ZK.
                 _segmentLogger
-                    .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", _currentOffset,
+                    .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace",
+                        _currentOffset,
                         endOffset);
                 downloadSegmentAndReplace(segmentZKMetadata);
               } else if (_currentOffset.compareTo(endOffset) == 0) {
                 _segmentLogger
-                    .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset);
+                    .info("Current offset {} matches offset in zk {}. Replacing segment",
+                        _currentOffset, endOffset);
                 buildSegmentAndReplace();
               } else {
-                _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset, endOffset);
+                _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset,
+                    endOffset);
                 boolean success = catchupToFinalOffset(endOffset,
-                    TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
+                    TimeUnit.MILLISECONDS
+                        .convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
                 if (success) {
                   _segmentLogger.info("Caught up to offset {}", _currentOffset);
                   buildSegmentAndReplace();
                 } else {
                   _segmentLogger
-                      .info("Could not catch up to offset (current = {}). Downloading to replace", _currentOffset);
+                      .info("Could not catch up to offset (current = {}). Downloading to replace",
+                          _currentOffset);
                   downloadSegmentAndReplace(segmentZKMetadata);
                 }
               }
@@ -1093,7 +1179,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           }
           break;
         default:
-          _segmentLogger.info("Downloading to replace segment while in state {}", _state.toString());
+          _segmentLogger
+              .info("Downloading to replace segment while in state {}", _state.toString());
           downloadSegmentAndReplace(segmentZKMetadata);
           break;
       }
@@ -1107,7 +1194,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) {
     closeStreamConsumers();
     _realtimeTableDataManager
-        .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig, _tableConfig);
+        .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig,
+            _tableConfig);
   }
 
   protected long now() {
@@ -1130,7 +1218,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     if (_currentOffset.compareTo(endOffset) != 0) {
       // Timeout?
-      _segmentLogger.error("Could not consume up to {} (current offset {})", endOffset, _currentOffset);
+      _segmentLogger
+          .error("Could not consume up to {} (current offset {})", endOffset, _currentOffset);
       return false;
     }
 
@@ -1175,9 +1264,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // Assume that this is called only on OFFLINE to CONSUMING transition.
   // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here.
   public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
-      RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
+      RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir,
+      IndexLoadingConfig indexLoadingConfig,
       Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore,
-      ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+      ServerMetrics serverMetrics,
+      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = segmentZKMetadata;
     _tableConfig = tableConfig;
@@ -1190,16 +1281,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics,
+        _tableNameWithType);
 
     String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     _partitionLevelStreamConfig =
-        new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig));
+        new PartitionLevelStreamConfig(_tableNameWithType,
+            IngestionConfigUtils.getStreamConfigMap(_tableConfig));
     _streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
     _streamPartitionMsgOffsetFactory =
-        StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
+        StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig)
+            .createStreamMsgOffsetFactory();
     _streamTopic = _partitionLevelStreamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
@@ -1213,26 +1307,31 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
-    _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
+    _segmentLogger = LoggerFactory
+        .getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
     _tableStreamName = _tableNameWithType + "_" + _streamTopic;
     _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
-        indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
+        indexLoadingConfig.isRealtimeOffHeapAllocation(),
+        indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
         serverMetrics);
 
     List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
     if (sortedColumns.isEmpty()) {
-      _segmentLogger.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}",
+      _segmentLogger.info(
+          "RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}",
           _llcSegmentName);
       _sortedColumn = null;
     } else {
       String firstSortedColumn = sortedColumns.get(0);
       if (_schema.hasColumn(firstSortedColumn)) {
-        _segmentLogger.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}",
+        _segmentLogger.info(
+            "Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}",
             firstSortedColumn, _llcSegmentName);
         _sortedColumn = firstSortedColumn;
       } else {
         _segmentLogger
-            .warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.",
+            .warn(
+                "Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.",
                 firstSortedColumn, _llcSegmentName);
         _sortedColumn = null;
       }
@@ -1251,7 +1350,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // No dictionary Columns
     _noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());
 
-    _varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());
+    _varLengthDictionaryColumns = new ArrayList<>(
+        indexLoadingConfig.getVarLengthDictionaryColumns());
 
     // Read the max number of rows
     int segmentMaxRowCount = _partitionLevelStreamConfig.getFlushThresholdRows();
@@ -1274,26 +1374,32 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Start new realtime segment
     String consumerDir = realtimeTableDataManager.getConsumerDir();
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr)
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
+            .setSegmentName(_segmentNameStr)
             .setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
-            .setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
+            .setCapacity(_segmentMaxRowCount)
+            .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
-            .setFSTIndexColumns(fstIndexColumns).setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
-            .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
+            .setFSTIndexColumns(fstIndexColumns)
+            .setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
+            .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs())
+            .setSegmentZKMetadata(segmentZKMetadata)
             .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
-            .setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
+            .setAggregateMetrics(indexingConfig.isAggregateMetrics())
+            .setNullHandlingEnabled(_nullHandlingEnabled)
             .setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
             .setHashFunction(tableConfig.getHashFunction())
             .setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn());
 
     // Create message decoder
-    Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
+    Set<String> fieldsToRead = IngestionUtils
+        .getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
     _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
-    _clientId = _streamTopic + "-" + _partitionGroupId;
+    _clientId = _instanceId + "-" + _streamTopic + "-" + _partitionGroupId;
 
     // Create record transformer
     _recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
@@ -1315,9 +1421,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
     SegmentPartitionConfig segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig();
     if (segmentPartitionConfig != null) {
-      Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+      Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig
+          .getColumnPartitionMap();
       if (columnPartitionMap.size() == 1) {
-        Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next();
+        Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator()
+            .next();
         String partitionColumn = entry.getKey();
         ColumnPartitionConfig columnPartitionConfig = entry.getValue();
         String partitionFunctionName = columnPartitionConfig.getFunctionName();
@@ -1342,7 +1450,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             _segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, "
                     + "using number of stream " + "partitions", numPartitionGroups, numPartitions);
-            _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
+            _serverMetrics
+                .addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH,
+                    1);
             numPartitions = numPartitionGroups;
           }
         } catch (Exception e) {
@@ -1354,10 +1464,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
         realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
         realtimeSegmentConfigBuilder
-            .setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions));
+            .setPartitionFunction(PartitionFunctionFactory
+                .getPartitionFunction(partitionFunctionName, numPartitions));
         realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId);
       } else {
-        _segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
+        _segmentLogger
+            .warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
       }
     }
 
@@ -1374,10 +1486,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
         .createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
       _latestStreamOffsetAtStartupTime = metadataProvider
-          .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
+          .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/
+              5000);
     } catch (Exception e) {
-      _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
-          _partitionGroupId);
+      _segmentLogger
+          .warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}",
+              _clientId,
+              _partitionGroupId);
     }
 
     long now = now();
@@ -1394,16 +1509,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // cases, we let some minimum consumption happen before we attempt to complete the segment (unless, of course
     // the max consumption time has been configured to be less than the minimum time we use in this class).
     long minConsumeTimeMillis =
-        Math.min(maxConsumeTimeMillis, TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES));
+        Math.min(maxConsumeTimeMillis,
+            TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES));
     if (_consumeEndTime - now < minConsumeTimeMillis) {
       _consumeEndTime = now + minConsumeTimeMillis;
     }
 
     _segmentCommitterFactory =
-        new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
+        new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig,
+            indexLoadingConfig, serverMetrics);
 
     _segmentLogger
-        .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
+        .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}",
+            _llcSegmentName,
             _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC).toString());
     start();
   }
@@ -1417,7 +1535,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     _segmentLogger.info("Creating new stream consumer, reason: {}", reason);
     _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+        _streamConsumerFactory
+            .createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
   }
 
   /**
@@ -1441,7 +1560,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Number of rows indexed should be used for DOCUMENT_COUNT metric, and also for segment flush. Whereas,
     // Number of rows consumed should be used for consumption metric.
     long rowsIndexed = _numRowsIndexed - _lastUpdatedRowsIndexed.get();
-    _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
+    _serverMetrics
+        .addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
     _lastUpdatedRowsIndexed.set(_numRowsIndexed);
     final long now = now();
     final int rowsConsumed = _numRowsConsumed - _lastConsumedCount;
@@ -1451,7 +1571,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         || rowsConsumed >= MSG_COUNT_THRESHOLD_FOR_LOG) {
       _segmentLogger.info(
           "Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}",
-          rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset, _numRowsConsumed,
+          rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset,
+          _numRowsConsumed,
           _numRowsIndexed);
       _lastConsumedCount = _numRowsConsumed;
       _lastLogTime = now;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index 735c91d..9748945 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -21,12 +21,15 @@ package org.apache.pinot.spi.stream;
 import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
 
-
 /**
  * Consumer interface for consuming from a partition group of a stream
  */
 public interface PartitionGroupConsumer extends Closeable {
 
+  default void start(StreamPartitionMsgOffset startOffset) {
+
+  }
+
   /**
    * Fetch messages and offsets from the stream partition group
    *
@@ -37,6 +40,16 @@ public interface PartitionGroupConsumer extends Closeable {
    * milliseconds
    * @return An iterable containing messages fetched from the stream partition and their offsets
    */
-  MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, int timeoutMs)
+  MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
+      StreamPartitionMsgOffset endOffset, int timeoutMs)
       throws TimeoutException;
+
+  default StreamPartitionMsgOffset commit(
+      final StreamPartitionMsgOffset currentOffset) {
+    return null;
+  }
+
+  default void rollback() {
+
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org