You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/11/25 19:14:56 UTC

[pinot] 01/01: Interface changes needed to support pub-sub

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch pub-sub
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit a485eaf0a462a6003d7313d4417a1ad17311b63f
Author: kishoreg <g....@gmail.com>
AuthorDate: Thu Nov 25 09:14:05 2021 -1000

    Interface changes needed to support pub-sub
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 341 ++++++++++++++-------
 .../pinot/spi/stream/PartitionGroupConsumer.java   |  19 +-
 2 files changed, 247 insertions(+), 113 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index a36935d..a7dc545 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -94,11 +94,12 @@ import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion.
+ * Segment data manager for low level consumer realtime segments, which manages consumption and
+ * segment completion.
  */
 public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
+
   protected enum State {
     // The state machine starts off with this state. While in this state we consume stream events
     // and index them in memory. We continue to be in this state until the end criteria is satisfied
@@ -140,11 +141,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     ERROR;
 
     public boolean shouldConsume() {
-      return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this.equals(CONSUMING_TO_ONLINE);
+      return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this
+          .equals(CONSUMING_TO_ONLINE);
     }
 
     public boolean isFinal() {
-      return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this.equals(DISCARDED);
+      return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this
+          .equals(DISCARDED);
     }
   }
 
@@ -152,6 +155,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   @VisibleForTesting
   public class SegmentBuildDescriptor {
+
     final File _segmentTarFile;
     final Map<String, File> _metadataFileMap;
     final StreamPartitionMsgOffset _offset;
@@ -159,8 +163,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final long _buildTimeMillis;
     final long _segmentSizeBytes;
 
-    public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable Map<String, File> metadataFileMap,
-        StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
+    public SegmentBuildDescriptor(@Nullable File segmentTarFile,
+        @Nullable Map<String, File> metadataFileMap,
+        StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis,
+        long segmentSizeBytes) {
       _segmentTarFile = segmentTarFile;
       _metadataFileMap = metadataFileMap;
       _offset = _streamPartitionMsgOffsetFactory.create(offset);
@@ -301,23 +307,27 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // the max time we are allowed to consume.
         if (now >= _consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
-            _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            _segmentLogger.info("No events came in, extending time by {} hours",
+                TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             return false;
           }
           _segmentLogger
-              .info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}",
+              .info(
+                  "Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}",
                   _startTimeMs, now, _numRowsConsumed, _numRowsIndexed);
           _stopReason = SegmentCompletionProtocol.REASON_TIME_LIMIT;
           return true;
         } else if (_numRowsIndexed >= _segmentMaxRowCount) {
-          _segmentLogger.info("Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}",
+          _segmentLogger.info(
+              "Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}",
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
         } else if (_endOfPartitionGroup) {
-          _segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, "
-              + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+          _segmentLogger.info(
+              "Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, "
+                  + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
           return true;
         }
@@ -334,7 +344,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           return true;
         }
         if (_currentOffset.compareTo(_finalOffset) > 0) {
-          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
+          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(),
+              _currentOffset,
               _finalOffset);
           throw new RuntimeException("Past max offset");
         }
@@ -348,11 +359,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           _segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString());
           return true;
         } else if (now >= _consumeEndTime) {
-          _segmentLogger.info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString());
+          _segmentLogger
+              .info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString());
           return true;
         }
         if (_currentOffset.compareTo(_finalOffset) > 0) {
-          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
+          _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(),
+              _currentOffset,
               _finalOffset);
           throw new RuntimeException("Past max offset");
         }
@@ -367,12 +380,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       throws Exception {
     _consecutiveErrorCount++;
     if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) {
-      _segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts",
+      _segmentLogger.warn(
+          "Stream transient exception when fetching messages, stopping consumption after {} attempts",
           _consecutiveErrorCount, e);
       throw e;
     } else {
       _segmentLogger
-          .warn("Stream transient exception when fetching messages, retrying (count={})", _consecutiveErrorCount, e);
+          .warn("Stream transient exception when fetching messages, retrying (count={})",
+              _consecutiveErrorCount, e);
       Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
       makeStreamConsumer("Too many transient errors");
     }
@@ -382,8 +397,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       throws Exception {
     _numRowsErrored = 0;
     final long idlePipeSleepTimeMillis = 100;
-    final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
-        .getFetchTimeoutMillis());  // 3 minute count
+    final long maxIdleCountBeforeStatUpdate =
+        (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
+            .getFetchTimeoutMillis());  // 3 minute count
     StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory
         .create(_currentOffset);  // so that we always update the metric when we enter this method.
     long consecutiveIdleCount = 0;
@@ -391,14 +407,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // anymore. Remove the file if it exists.
     removeSegmentFile();
 
-    _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset, _finalOffset);
+    _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset,
+        _finalOffset);
     while (!_shouldStop && !endCriteriaReached()) {
       // Consume for the next readTime ms, or we get to final offset, whichever happens earlier,
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
         messageBatch = _partitionGroupConsumer
-            .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+            .fetchMessages(_currentOffset, null,
+                _partitionLevelStreamConfig.getFetchTimeoutMillis());
         _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         _consecutiveErrorCount = 0;
       } catch (TimeoutException e) {
@@ -408,7 +426,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         handleTransientStreamErrors(e);
         continue;
       } catch (PermanentConsumerException e) {
-        _segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e);
+        _segmentLogger
+            .warn("Permanent exception from stream when fetching messages, stopping consumption",
+                e);
         throw e;
       } catch (Exception e) {
         // Unknown exception from stream. Treat as a transient exception.
@@ -435,7 +455,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // Create a new stream consumer wrapper, in case we are stuck on something.
         consecutiveIdleCount++;
         if (consecutiveIdleCount > maxIdleCountBeforeStatUpdate) {
-          _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
+          _serverMetrics
+              .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
           consecutiveIdleCount = 0;
           makeStreamConsumer("Idle for too long");
         }
@@ -443,8 +464,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
 
     if (_numRowsErrored > 0) {
-      _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
-      _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+      _serverMetrics
+          .addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+      _serverMetrics
+          .addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
     }
     return true;
   }
@@ -479,7 +502,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         //    Throw an exception.
         //
         _segmentLogger
-            .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed, _numRowsIndexed,
+            .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed,
+                _numRowsIndexed,
                 _segmentMaxRowCount);
         throw new RuntimeException("Realtime segment full");
       }
@@ -491,7 +515,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
 
       GenericRow decodedRow = _messageDecoder
-          .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
+          .decode(messagesAndOffsets.getMessageAtIndex(index),
+              messagesAndOffsets.getMessageOffsetAtIndex(index),
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
         try {
@@ -500,7 +525,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             decodedRow = _complexTypeTransformer.transform(decodedRow);
           }
           if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-            for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+            for (Object singleRow : (Collection) decodedRow
+                .getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
               GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
               if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
                 realtimeRowsConsumedMeter = _serverMetrics
@@ -510,7 +536,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                 canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
               } else {
                 realtimeRowsDroppedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                    .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
+                        1,
                         realtimeRowsDroppedMeter);
               }
             }
@@ -524,16 +551,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
             } else {
               realtimeRowsDroppedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                  .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
+                      1,
                       realtimeRowsDroppedMeter);
             }
           }
         } catch (Exception e) {
-          String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
+          String errorMessage = String
+              .format("Caught exception while transforming the record: %s", decodedRow);
           _segmentLogger.error(errorMessage, e);
           _numRowsErrored++;
           _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+              .addSegmentError(_segmentNameStr,
+                  new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
         }
       } else {
         realtimeRowsDroppedMeter = _serverMetrics
@@ -548,7 +578,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     updateCurrentDocumentCountMetrics();
     if (streamMessageCount != 0) {
-      _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount,
+      _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
+          indexedMessageCount,
           streamMessageCount, _currentOffset);
     } else {
       // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
@@ -557,6 +588,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   public class PartitionConsumer implements Runnable {
+
     public void run() {
       long initialConsumptionEnd = 0L;
       long lastCatchUpStart = 0L;
@@ -567,7 +599,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           if (_state.shouldConsume()) {
             consumeLoop();  // Consume until we reached the end criteria, or we are stopped.
           }
-          _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+          _serverMetrics
+              .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
           if (_shouldStop) {
             break;
           }
@@ -580,7 +613,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           } else if (_state == State.CATCHING_UP) {
             catchUpTimeMillis += now() - lastCatchUpStart;
             _serverMetrics
-                .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
+                .setValueOfTableGauge(_metricKeyName,
+                    ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
                     TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
           }
 
@@ -599,8 +633,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             case CATCH_UP:
               if (rspOffset.compareTo(_currentOffset) <= 0) {
                 // Something wrong with the controller. Back off and try again.
-                _segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", rspOffset,
-                    _currentOffset);
+                _segmentLogger
+                    .error("Invalid catchup offset {} in controller response, current offset {}",
+                        rspOffset,
+                        _currentOffset);
                 hold();
               } else {
                 _state = State.CATCHING_UP;
@@ -631,7 +667,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                     // Could not build segment for some reason. We can only download it.
                     _state = State.ERROR;
                     _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-                        new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null));
+                        new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment",
+                            null));
                   }
                   break;
                 default:
@@ -640,13 +677,21 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               break;
             case COMMIT:
               _state = State.COMMITTING;
+              StreamPartitionMsgOffset endOffset = _partitionGroupConsumer.commit(_currentOffset);
+              if (endOffset != null) {
+                _currentOffset = endOffset;
+              }
+
               long buildTimeSeconds = response.getBuildTimeSeconds();
+
               buildSegmentForCommit(buildTimeSeconds * 1000L);
+              boolean commitSuccess = false;
               if (_segmentBuildDescriptor == null) {
                 // We could not build the segment. Go into error state.
                 _state = State.ERROR;
                 _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-                    new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null));
+                    new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment",
+                        null));
               } else {
                 success = commitSegment(response.getControllerVipUrl(),
                     response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit());
@@ -660,9 +705,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                   hold();
                 }
               }
+              if (_state != State.COMMITTED) {
+                _partitionGroupConsumer.rollback();
+              }
+
               break;
             default:
-              _segmentLogger.error("Holding after response from Controller: {}", response.toJsonString());
+              _segmentLogger
+                  .error("Holding after response from Controller: {}", response.toJsonString());
               hold();
               break;
           }
@@ -673,7 +723,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         postStopConsumedMsg(e.getClass().getName());
         _state = State.ERROR;
         _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+            .addSegmentError(_segmentNameStr,
+                new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
         return;
       }
@@ -682,7 +733,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
       if (initialConsumptionEnd != 0L) {
         _serverMetrics
-            .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
+            .setValueOfTableGauge(_metricKeyName,
+                ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
                 TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd));
       }
       // There is a race condition that the destroy() method can be called which ends up calling stop on the consumer.
@@ -703,7 +755,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private CompletionMode getSegmentCompletionMode() {
     CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig();
     if (completionConfig != null) {
-      if (CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())) {
+      if (CompletionMode.DOWNLOAD.toString()
+          .equalsIgnoreCase(completionConfig.getCompletionMode())) {
         return CompletionMode.DOWNLOAD;
       }
     }
@@ -724,7 +777,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // built the segment successfully.
   protected void buildSegmentForCommit(long buildTimeLeaseMs) {
     try {
-      if (_segmentBuildDescriptor != null && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) {
+      if (_segmentBuildDescriptor != null
+          && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) {
         // Double-check that we have the file, just in case.
         File segmentTarFile = _segmentBuildDescriptor.getSegmentTarFile();
         if (segmentTarFile != null && segmentTarFile.exists()) {
@@ -734,7 +788,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       removeSegmentFile();
       if (buildTimeLeaseMs <= 0) {
         if (_segBuildSemaphore == null) {
-          buildTimeLeaseMs = SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L;
+          buildTimeLeaseMs =
+              SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L;
         } else {
           // We know we are going to use a semaphore to limit number of segment builds, and could be
           // blocked for a long time. The controller has not provided a lease time, so set one to
@@ -806,7 +861,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + _segmentNameStr + "-" + now());
       // lets convert the segment now
       RealtimeSegmentConverter converter =
-          new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
+          new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(),
+              _schema,
               _tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(), _sortedColumn,
               _invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _noDictionaryColumns,
               _varLengthDictionaryColumns, _nullHandlingEnabled);
@@ -821,7 +877,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       final long buildTimeMillis = now() - lockAcquireTimeMillis;
       final long waitTimeMillis = lockAcquireTimeMillis - startTimeMillis;
       _segmentLogger
-          .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis, waitTimeMillis);
+          .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis,
+              waitTimeMillis);
 
       File dataDir = new File(_resourceDataDir);
       File indexDir = new File(dataDir, _segmentNameStr);
@@ -834,55 +891,67 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         FileUtils.moveDirectory(tempIndexDir, indexDir);
       } catch (IOException e) {
         String errorMessage =
-            String.format("Caught exception while moving index directory from: %s to: %s", tempIndexDir, indexDir);
+            String.format("Caught exception while moving index directory from: %s to: %s",
+                tempIndexDir, indexDir);
         _segmentLogger.error(errorMessage, e);
         _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+            .addSegmentError(_segmentNameStr,
+                new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
         return null;
       } finally {
         FileUtils.deleteQuietly(tempSegmentFolder);
       }
 
       long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir);
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
+      _serverMetrics.setValueOfTableGauge(_metricKeyName,
+          ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
           TimeUnit.MILLISECONDS.toSeconds(buildTimeMillis));
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
+      _serverMetrics.setValueOfTableGauge(_metricKeyName,
+          ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
           TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
 
       if (forCommit) {
-        File segmentTarFile = new File(dataDir, _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        File segmentTarFile = new File(dataDir,
+            _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
         try {
           TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
         } catch (IOException e) {
           String errorMessage =
-              String.format("Caught exception while taring index directory from: %s to: %s", indexDir, segmentTarFile);
+              String
+                  .format("Caught exception while taring index directory from: %s to: %s", indexDir,
+                      segmentTarFile);
           _segmentLogger.error(errorMessage, e);
           _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+              .addSegmentError(_segmentNameStr,
+                  new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
           return null;
         }
 
         File metadataFile = SegmentDirectoryPaths.findMetadataFile(indexDir);
         if (metadataFile == null) {
           _segmentLogger
-              .error("Failed to find file: {} under index directory: {}", V1Constants.MetadataKeys.METADATA_FILE_NAME,
+              .error("Failed to find file: {} under index directory: {}",
+                  V1Constants.MetadataKeys.METADATA_FILE_NAME,
                   indexDir);
           return null;
         }
         File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
         if (creationMetaFile == null) {
           _segmentLogger
-              .error("Failed to find file: {} under index directory: {}", V1Constants.SEGMENT_CREATION_META, indexDir);
+              .error("Failed to find file: {} under index directory: {}",
+                  V1Constants.SEGMENT_CREATION_META, indexDir);
           return null;
         }
         Map<String, File> metadataFiles = new HashMap<>();
         metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFile);
         metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile);
 
-        return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset, buildTimeMillis,
+        return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset,
+            buildTimeMillis,
             waitTimeMillis, segmentSizeBytes);
       } else {
-        return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis, waitTimeMillis,
+        return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis,
+            waitTimeMillis,
             segmentSizeBytes);
       }
     } catch (InterruptedException e) {
@@ -904,7 +973,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl, isSplitCommit);
 
-    if (!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
+    if (!commitResponse.getStatus()
+        .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
       return false;
     }
 
@@ -913,7 +983,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     return true;
   }
 
-  protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) {
+  protected SegmentCompletionProtocol.Response commit(String controllerVipUrl,
+      boolean isSplitCommit) {
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
 
     params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
@@ -927,7 +998,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
     SegmentCommitter segmentCommitter;
     try {
-      segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
+      segmentCommitter = _segmentCommitterFactory
+          .createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
     } catch (URISyntaxException e) {
       _segmentLogger.error("Failed to create a segment committer: ", e);
       return SegmentCompletionProtocol.RESP_NOT_SENT;
@@ -971,12 +1043,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   /**
    * Cleans up the metrics that reflects the state of the realtime segment.
-   * This step is essential as the instance may not be the target location for some of the partitions.
-   * E.g. if the number of partitions increases, or a host swap is needed, the target location for some partitions
+   * This step is essential as the instance may not be the target location for some of the
+   * partitions.
+   * E.g. if the number of partitions increases, or a host swap is needed, the target location for
+   * some partitions
    * may change,
-   * and the current host remains to run. In this case, the current server would still keep the state of the old
+   * and the current host remains to run. In this case, the current server would still keep the
+   * state of the old
    * partitions,
-   * which no longer resides in this host any more, thus causes false positive information to the metric system.
+   * which no longer resides in this host any more, thus causes false positive information to the
+   * metric system.
    */
   private void cleanupMetrics() {
     _serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING);
@@ -994,10 +1070,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected void postStopConsumedMsg(String reason) {
     do {
       SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-      params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason).withSegmentName(_segmentNameStr)
+      params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason)
+          .withSegmentName(_segmentNameStr)
           .withInstanceId(_instanceId);
 
-      SegmentCompletionProtocol.Response response = _protocolHandler.segmentStoppedConsuming(params);
+      SegmentCompletionProtocol.Response response = _protocolHandler
+          .segmentStoppedConsuming(params);
       if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
         _segmentLogger.info("Got response {}", response.toJsonString());
         break;
@@ -1036,7 +1114,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       final StreamPartitionMsgOffset endOffset =
           _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
       _segmentLogger
-          .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
+          .info(
+              "State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})",
+              _state.toString(),
               _startOffset, endOffset);
       stop();
       _segmentLogger.info("Consumer thread stopped in state {}", _state.toString());
@@ -1058,8 +1138,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           CompletionMode segmentCompletionMode = getSegmentCompletionMode();
           switch (segmentCompletionMode) {
             case DOWNLOAD:
-              _segmentLogger.info("State {}. CompletionMode {}. Downloading to replace", _state.toString(),
-                  segmentCompletionMode);
+              _segmentLogger
+                  .info("State {}. CompletionMode {}. Downloading to replace", _state.toString(),
+                      segmentCompletionMode);
               downloadSegmentAndReplace(segmentZKMetadata);
               break;
             case DEFAULT:
@@ -1067,23 +1148,28 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               if (_currentOffset.compareTo(endOffset) > 0) {
                 // We moved ahead of the offset that is committed in ZK.
                 _segmentLogger
-                    .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", _currentOffset,
+                    .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace",
+                        _currentOffset,
                         endOffset);
                 downloadSegmentAndReplace(segmentZKMetadata);
               } else if (_currentOffset.compareTo(endOffset) == 0) {
                 _segmentLogger
-                    .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset);
+                    .info("Current offset {} matches offset in zk {}. Replacing segment",
+                        _currentOffset, endOffset);
                 buildSegmentAndReplace();
               } else {
-                _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset, endOffset);
+                _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset,
+                    endOffset);
                 boolean success = catchupToFinalOffset(endOffset,
-                    TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
+                    TimeUnit.MILLISECONDS
+                        .convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
                 if (success) {
                   _segmentLogger.info("Caught up to offset {}", _currentOffset);
                   buildSegmentAndReplace();
                 } else {
                   _segmentLogger
-                      .info("Could not catch up to offset (current = {}). Downloading to replace", _currentOffset);
+                      .info("Could not catch up to offset (current = {}). Downloading to replace",
+                          _currentOffset);
                   downloadSegmentAndReplace(segmentZKMetadata);
                 }
               }
@@ -1093,7 +1179,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           }
           break;
         default:
-          _segmentLogger.info("Downloading to replace segment while in state {}", _state.toString());
+          _segmentLogger
+              .info("Downloading to replace segment while in state {}", _state.toString());
           downloadSegmentAndReplace(segmentZKMetadata);
           break;
       }
@@ -1107,7 +1194,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) {
     closeStreamConsumers();
     _realtimeTableDataManager
-        .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig, _tableConfig);
+        .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig,
+            _tableConfig);
   }
 
   protected long now() {
@@ -1130,7 +1218,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     if (_currentOffset.compareTo(endOffset) != 0) {
       // Timeout?
-      _segmentLogger.error("Could not consume up to {} (current offset {})", endOffset, _currentOffset);
+      _segmentLogger
+          .error("Could not consume up to {} (current offset {})", endOffset, _currentOffset);
       return false;
     }
 
@@ -1175,9 +1264,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // Assume that this is called only on OFFLINE to CONSUMING transition.
   // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here.
   public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
-      RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
+      RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir,
+      IndexLoadingConfig indexLoadingConfig,
       Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore,
-      ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+      ServerMetrics serverMetrics,
+      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = segmentZKMetadata;
     _tableConfig = tableConfig;
@@ -1190,16 +1281,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics,
+        _tableNameWithType);
 
     String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     _partitionLevelStreamConfig =
-        new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig));
+        new PartitionLevelStreamConfig(_tableNameWithType,
+            IngestionConfigUtils.getStreamConfigMap(_tableConfig));
     _streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
     _streamPartitionMsgOffsetFactory =
-        StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
+        StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig)
+            .createStreamMsgOffsetFactory();
     _streamTopic = _partitionLevelStreamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
@@ -1213,26 +1307,31 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
-    _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
+    _segmentLogger = LoggerFactory
+        .getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
     _tableStreamName = _tableNameWithType + "_" + _streamTopic;
     _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
-        indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
+        indexLoadingConfig.isRealtimeOffHeapAllocation(),
+        indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
         serverMetrics);
 
     List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
     if (sortedColumns.isEmpty()) {
-      _segmentLogger.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}",
+      _segmentLogger.info(
+          "RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}",
           _llcSegmentName);
       _sortedColumn = null;
     } else {
       String firstSortedColumn = sortedColumns.get(0);
       if (_schema.hasColumn(firstSortedColumn)) {
-        _segmentLogger.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}",
+        _segmentLogger.info(
+            "Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}",
             firstSortedColumn, _llcSegmentName);
         _sortedColumn = firstSortedColumn;
       } else {
         _segmentLogger
-            .warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.",
+            .warn(
+                "Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.",
                 firstSortedColumn, _llcSegmentName);
         _sortedColumn = null;
       }
@@ -1251,7 +1350,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // No dictionary Columns
     _noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());
 
-    _varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());
+    _varLengthDictionaryColumns = new ArrayList<>(
+        indexLoadingConfig.getVarLengthDictionaryColumns());
 
     // Read the max number of rows
     int segmentMaxRowCount = _partitionLevelStreamConfig.getFlushThresholdRows();
@@ -1274,26 +1374,32 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Start new realtime segment
     String consumerDir = realtimeTableDataManager.getConsumerDir();
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr)
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
+            .setSegmentName(_segmentNameStr)
             .setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
-            .setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
+            .setCapacity(_segmentMaxRowCount)
+            .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
-            .setFSTIndexColumns(fstIndexColumns).setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
-            .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
+            .setFSTIndexColumns(fstIndexColumns)
+            .setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
+            .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs())
+            .setSegmentZKMetadata(segmentZKMetadata)
             .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
-            .setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
+            .setAggregateMetrics(indexingConfig.isAggregateMetrics())
+            .setNullHandlingEnabled(_nullHandlingEnabled)
             .setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
             .setHashFunction(tableConfig.getHashFunction())
             .setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn());
 
     // Create message decoder
-    Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
+    Set<String> fieldsToRead = IngestionUtils
+        .getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
     _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
-    _clientId = _streamTopic + "-" + _partitionGroupId;
+    _clientId = _instanceId + "-" + _streamTopic + "-" + _partitionGroupId;
 
     // Create record transformer
     _recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
@@ -1315,9 +1421,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
     SegmentPartitionConfig segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig();
     if (segmentPartitionConfig != null) {
-      Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+      Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig
+          .getColumnPartitionMap();
       if (columnPartitionMap.size() == 1) {
-        Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next();
+        Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator()
+            .next();
         String partitionColumn = entry.getKey();
         ColumnPartitionConfig columnPartitionConfig = entry.getValue();
         String partitionFunctionName = columnPartitionConfig.getFunctionName();
@@ -1342,7 +1450,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             _segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, "
                     + "using number of stream " + "partitions", numPartitionGroups, numPartitions);
-            _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
+            _serverMetrics
+                .addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH,
+                    1);
             numPartitions = numPartitionGroups;
           }
         } catch (Exception e) {
@@ -1354,10 +1464,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
         realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
         realtimeSegmentConfigBuilder
-            .setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions));
+            .setPartitionFunction(PartitionFunctionFactory
+                .getPartitionFunction(partitionFunctionName, numPartitions));
         realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId);
       } else {
-        _segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
+        _segmentLogger
+            .warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
       }
     }
 
@@ -1374,10 +1486,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
         .createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
       _latestStreamOffsetAtStartupTime = metadataProvider
-          .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
+          .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/
+              5000);
     } catch (Exception e) {
-      _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
-          _partitionGroupId);
+      _segmentLogger
+          .warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}",
+              _clientId,
+              _partitionGroupId);
     }
 
     long now = now();
@@ -1394,16 +1509,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // cases, we let some minimum consumption happen before we attempt to complete the segment (unless, of course
     // the max consumption time has been configured to be less than the minimum time we use in this class).
     long minConsumeTimeMillis =
-        Math.min(maxConsumeTimeMillis, TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES));
+        Math.min(maxConsumeTimeMillis,
+            TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES));
     if (_consumeEndTime - now < minConsumeTimeMillis) {
       _consumeEndTime = now + minConsumeTimeMillis;
     }
 
     _segmentCommitterFactory =
-        new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
+        new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig,
+            indexLoadingConfig, serverMetrics);
 
     _segmentLogger
-        .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
+        .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}",
+            _llcSegmentName,
             _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC).toString());
     start();
   }
@@ -1417,7 +1535,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
     _segmentLogger.info("Creating new stream consumer, reason: {}", reason);
     _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+        _streamConsumerFactory
+            .createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
   }
 
   /**
@@ -1441,7 +1560,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Number of rows indexed should be used for DOCUMENT_COUNT metric, and also for segment flush. Whereas,
     // Number of rows consumed should be used for consumption metric.
     long rowsIndexed = _numRowsIndexed - _lastUpdatedRowsIndexed.get();
-    _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
+    _serverMetrics
+        .addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
     _lastUpdatedRowsIndexed.set(_numRowsIndexed);
     final long now = now();
     final int rowsConsumed = _numRowsConsumed - _lastConsumedCount;
@@ -1451,7 +1571,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         || rowsConsumed >= MSG_COUNT_THRESHOLD_FOR_LOG) {
       _segmentLogger.info(
           "Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}",
-          rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset, _numRowsConsumed,
+          rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset,
+          _numRowsConsumed,
           _numRowsIndexed);
       _lastConsumedCount = _numRowsConsumed;
       _lastLogTime = now;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index 735c91d..9748945 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -21,12 +21,15 @@ package org.apache.pinot.spi.stream;
 import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
 
-
 /**
  * Consumer interface for consuming from a partition group of a stream
  */
 public interface PartitionGroupConsumer extends Closeable {
 
+  default void start(StreamPartitionMsgOffset startOffset) {
+
+  }
+
   /**
    * Fetch messages and offsets from the stream partition group
    *
@@ -37,6 +40,16 @@ public interface PartitionGroupConsumer extends Closeable {
    * milliseconds
    * @return An iterable containing messages fetched from the stream partition and their offsets
    */
-  MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, int timeoutMs)
+  MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
+      StreamPartitionMsgOffset endOffset, int timeoutMs)
       throws TimeoutException;
+
+  default StreamPartitionMsgOffset commit(
+      final StreamPartitionMsgOffset currentOffset) {
+    return null;
+  }
+
+  default void rollback() {
+
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org