You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/11/25 19:14:56 UTC
[pinot] 01/01: Interface changes needed to support pub-sub
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch pub-sub
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit a485eaf0a462a6003d7313d4417a1ad17311b63f
Author: kishoreg <g....@gmail.com>
AuthorDate: Thu Nov 25 09:14:05 2021 -1000
Interface changes needed to support pub-sub
---
.../realtime/LLRealtimeSegmentDataManager.java | 341 ++++++++++++++-------
.../pinot/spi/stream/PartitionGroupConsumer.java | 19 +-
2 files changed, 247 insertions(+), 113 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index a36935d..a7dc545 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -94,11 +94,12 @@ import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion.
+ * Segment data manager for low level consumer realtime segments, which manages consumption and
+ * segment completion.
*/
public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
+
protected enum State {
// The state machine starts off with this state. While in this state we consume stream events
// and index them in memory. We continue to be in this state until the end criteria is satisfied
@@ -140,11 +141,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
ERROR;
public boolean shouldConsume() {
- return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this.equals(CONSUMING_TO_ONLINE);
+ return this.equals(INITIAL_CONSUMING) || this.equals(CATCHING_UP) || this
+ .equals(CONSUMING_TO_ONLINE);
}
public boolean isFinal() {
- return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this.equals(DISCARDED);
+ return this.equals(ERROR) || this.equals(COMMITTED) || this.equals(RETAINED) || this
+ .equals(DISCARDED);
}
}
@@ -152,6 +155,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
@VisibleForTesting
public class SegmentBuildDescriptor {
+
final File _segmentTarFile;
final Map<String, File> _metadataFileMap;
final StreamPartitionMsgOffset _offset;
@@ -159,8 +163,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
final long _buildTimeMillis;
final long _segmentSizeBytes;
- public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable Map<String, File> metadataFileMap,
- StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
+ public SegmentBuildDescriptor(@Nullable File segmentTarFile,
+ @Nullable Map<String, File> metadataFileMap,
+ StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis,
+ long segmentSizeBytes) {
_segmentTarFile = segmentTarFile;
_metadataFileMap = metadataFileMap;
_offset = _streamPartitionMsgOffsetFactory.create(offset);
@@ -301,23 +307,27 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// the max time we are allowed to consume.
if (now >= _consumeEndTime) {
if (_realtimeSegment.getNumDocsIndexed() == 0) {
- _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+ _segmentLogger.info("No events came in, extending time by {} hours",
+ TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
_consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
return false;
}
_segmentLogger
- .info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}",
+ .info(
+ "Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}",
_startTimeMs, now, _numRowsConsumed, _numRowsIndexed);
_stopReason = SegmentCompletionProtocol.REASON_TIME_LIMIT;
return true;
} else if (_numRowsIndexed >= _segmentMaxRowCount) {
- _segmentLogger.info("Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}",
+ _segmentLogger.info(
+ "Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}",
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
} else if (_endOfPartitionGroup) {
- _segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, "
- + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+ _segmentLogger.info(
+ "Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, "
+ + "numRowsConsumed={}", _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
return true;
}
@@ -334,7 +344,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
return true;
}
if (_currentOffset.compareTo(_finalOffset) > 0) {
- _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
+ _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(),
+ _currentOffset,
_finalOffset);
throw new RuntimeException("Past max offset");
}
@@ -348,11 +359,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString());
return true;
} else if (now >= _consumeEndTime) {
- _segmentLogger.info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString());
+ _segmentLogger
+ .info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString());
return true;
}
if (_currentOffset.compareTo(_finalOffset) > 0) {
- _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(), _currentOffset,
+ _segmentLogger.error("Offset higher in state={}, current={}, final={}", _state.toString(),
+ _currentOffset,
_finalOffset);
throw new RuntimeException("Past max offset");
}
@@ -367,12 +380,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
throws Exception {
_consecutiveErrorCount++;
if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) {
- _segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts",
+ _segmentLogger.warn(
+ "Stream transient exception when fetching messages, stopping consumption after {} attempts",
_consecutiveErrorCount, e);
throw e;
} else {
_segmentLogger
- .warn("Stream transient exception when fetching messages, retrying (count={})", _consecutiveErrorCount, e);
+ .warn("Stream transient exception when fetching messages, retrying (count={})",
+ _consecutiveErrorCount, e);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
makeStreamConsumer("Too many transient errors");
}
@@ -382,8 +397,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
throws Exception {
_numRowsErrored = 0;
final long idlePipeSleepTimeMillis = 100;
- final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
- .getFetchTimeoutMillis()); // 3 minute count
+ final long maxIdleCountBeforeStatUpdate =
+ (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
+ .getFetchTimeoutMillis()); // 3 minute count
StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory
.create(_currentOffset); // so that we always update the metric when we enter this method.
long consecutiveIdleCount = 0;
@@ -391,14 +407,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// anymore. Remove the file if it exists.
removeSegmentFile();
- _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset, _finalOffset);
+ _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset,
+ _finalOffset);
while (!_shouldStop && !endCriteriaReached()) {
// Consume for the next readTime ms, or we get to final offset, whichever happens earlier,
// Update _currentOffset upon return from this method
MessageBatch messageBatch;
try {
messageBatch = _partitionGroupConsumer
- .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+ .fetchMessages(_currentOffset, null,
+ _partitionLevelStreamConfig.getFetchTimeoutMillis());
_endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
_consecutiveErrorCount = 0;
} catch (TimeoutException e) {
@@ -408,7 +426,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
handleTransientStreamErrors(e);
continue;
} catch (PermanentConsumerException e) {
- _segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e);
+ _segmentLogger
+ .warn("Permanent exception from stream when fetching messages, stopping consumption",
+ e);
throw e;
} catch (Exception e) {
// Unknown exception from stream. Treat as a transient exception.
@@ -435,7 +455,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Create a new stream consumer wrapper, in case we are stuck on something.
consecutiveIdleCount++;
if (consecutiveIdleCount > maxIdleCountBeforeStatUpdate) {
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
+ _serverMetrics
+ .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
consecutiveIdleCount = 0;
makeStreamConsumer("Idle for too long");
}
@@ -443,8 +464,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
if (_numRowsErrored > 0) {
- _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
- _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+ _serverMetrics
+ .addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+ _serverMetrics
+ .addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
}
return true;
}
@@ -479,7 +502,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Throw an exception.
//
_segmentLogger
- .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed, _numRowsIndexed,
+ .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed,
+ _numRowsIndexed,
_segmentMaxRowCount);
throw new RuntimeException("Realtime segment full");
}
@@ -491,7 +515,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
GenericRow decodedRow = _messageDecoder
- .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
+ .decode(messagesAndOffsets.getMessageAtIndex(index),
+ messagesAndOffsets.getMessageOffsetAtIndex(index),
messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
if (decodedRow != null) {
try {
@@ -500,7 +525,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
decodedRow = _complexTypeTransformer.transform(decodedRow);
}
if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
- for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+ for (Object singleRow : (Collection) decodedRow
+ .getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
realtimeRowsConsumedMeter = _serverMetrics
@@ -510,7 +536,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
} else {
realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
+ 1,
realtimeRowsDroppedMeter);
}
}
@@ -524,16 +551,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
} else {
realtimeRowsDroppedMeter = _serverMetrics
- .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED,
+ 1,
realtimeRowsDroppedMeter);
}
}
} catch (Exception e) {
- String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
+ String errorMessage = String
+ .format("Caught exception while transforming the record: %s", decodedRow);
_segmentLogger.error(errorMessage, e);
_numRowsErrored++;
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
}
} else {
realtimeRowsDroppedMeter = _serverMetrics
@@ -548,7 +578,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
updateCurrentDocumentCountMetrics();
if (streamMessageCount != 0) {
- _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount,
+ _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
+ indexedMessageCount,
streamMessageCount, _currentOffset);
} else {
// If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
@@ -557,6 +588,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
public class PartitionConsumer implements Runnable {
+
public void run() {
long initialConsumptionEnd = 0L;
long lastCatchUpStart = 0L;
@@ -567,7 +599,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (_state.shouldConsume()) {
consumeLoop(); // Consume until we reached the end criteria, or we are stopped.
}
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics
+ .setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
if (_shouldStop) {
break;
}
@@ -580,7 +613,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
} else if (_state == State.CATCHING_UP) {
catchUpTimeMillis += now() - lastCatchUpStart;
_serverMetrics
- .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
+ .setValueOfTableGauge(_metricKeyName,
+ ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
}
@@ -599,8 +633,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
case CATCH_UP:
if (rspOffset.compareTo(_currentOffset) <= 0) {
// Something wrong with the controller. Back off and try again.
- _segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", rspOffset,
- _currentOffset);
+ _segmentLogger
+ .error("Invalid catchup offset {} in controller response, current offset {}",
+ rspOffset,
+ _currentOffset);
hold();
} else {
_state = State.CATCHING_UP;
@@ -631,7 +667,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Could not build segment for some reason. We can only download it.
_state = State.ERROR;
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
- new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null));
+ new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment",
+ null));
}
break;
default:
@@ -640,13 +677,21 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
break;
case COMMIT:
_state = State.COMMITTING;
+ StreamPartitionMsgOffset endOffset = _partitionGroupConsumer.commit(_currentOffset);
+ if (endOffset != null) {
+ _currentOffset = endOffset;
+ }
+
long buildTimeSeconds = response.getBuildTimeSeconds();
+
buildSegmentForCommit(buildTimeSeconds * 1000L);
+ boolean commitSuccess = false;
if (_segmentBuildDescriptor == null) {
// We could not build the segment. Go into error state.
_state = State.ERROR;
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
- new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", null));
+ new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment",
+ null));
} else {
success = commitSegment(response.getControllerVipUrl(),
response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit());
@@ -660,9 +705,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
hold();
}
}
+ if (_state != State.COMMITTED) {
+ _partitionGroupConsumer.rollback();
+ }
+
break;
default:
- _segmentLogger.error("Holding after response from Controller: {}", response.toJsonString());
+ _segmentLogger
+ .error("Holding after response from Controller: {}", response.toJsonString());
hold();
break;
}
@@ -673,7 +723,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
postStopConsumedMsg(e.getClass().getName());
_state = State.ERROR;
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
}
@@ -682,7 +733,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (initialConsumptionEnd != 0L) {
_serverMetrics
- .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
+ .setValueOfTableGauge(_metricKeyName,
+ ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd));
}
// There is a race condition that the destroy() method can be called which ends up calling stop on the consumer.
@@ -703,7 +755,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private CompletionMode getSegmentCompletionMode() {
CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig();
if (completionConfig != null) {
- if (CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())) {
+ if (CompletionMode.DOWNLOAD.toString()
+ .equalsIgnoreCase(completionConfig.getCompletionMode())) {
return CompletionMode.DOWNLOAD;
}
}
@@ -724,7 +777,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// built the segment successfully.
protected void buildSegmentForCommit(long buildTimeLeaseMs) {
try {
- if (_segmentBuildDescriptor != null && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) {
+ if (_segmentBuildDescriptor != null
+ && _segmentBuildDescriptor.getOffset().compareTo(_currentOffset) == 0) {
// Double-check that we have the file, just in case.
File segmentTarFile = _segmentBuildDescriptor.getSegmentTarFile();
if (segmentTarFile != null && segmentTarFile.exists()) {
@@ -734,7 +788,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
removeSegmentFile();
if (buildTimeLeaseMs <= 0) {
if (_segBuildSemaphore == null) {
- buildTimeLeaseMs = SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L;
+ buildTimeLeaseMs =
+ SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000L;
} else {
// We know we are going to use a semaphore to limit number of segment builds, and could be
// blocked for a long time. The controller has not provided a lease time, so set one to
@@ -806,7 +861,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + _segmentNameStr + "-" + now());
// lets convert the segment now
RealtimeSegmentConverter converter =
- new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
+ new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(),
+ _schema,
_tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(), _sortedColumn,
_invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _noDictionaryColumns,
_varLengthDictionaryColumns, _nullHandlingEnabled);
@@ -821,7 +877,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
final long buildTimeMillis = now() - lockAcquireTimeMillis;
final long waitTimeMillis = lockAcquireTimeMillis - startTimeMillis;
_segmentLogger
- .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis, waitTimeMillis);
+ .info("Successfully built segment in {} ms, after lockWaitTime {} ms", buildTimeMillis,
+ waitTimeMillis);
File dataDir = new File(_resourceDataDir);
File indexDir = new File(dataDir, _segmentNameStr);
@@ -834,55 +891,67 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
FileUtils.moveDirectory(tempIndexDir, indexDir);
} catch (IOException e) {
String errorMessage =
- String.format("Caught exception while moving index directory from: %s to: %s", tempIndexDir, indexDir);
+ String.format("Caught exception while moving index directory from: %s to: %s",
+ tempIndexDir, indexDir);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
return null;
} finally {
FileUtils.deleteQuietly(tempSegmentFolder);
}
long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir);
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
+ _serverMetrics.setValueOfTableGauge(_metricKeyName,
+ ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(buildTimeMillis));
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
+ _serverMetrics.setValueOfTableGauge(_metricKeyName,
+ ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
if (forCommit) {
- File segmentTarFile = new File(dataDir, _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File segmentTarFile = new File(dataDir,
+ _segmentNameStr + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
try {
TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
} catch (IOException e) {
String errorMessage =
- String.format("Caught exception while taring index directory from: %s to: %s", indexDir, segmentTarFile);
+ String
+ .format("Caught exception while taring index directory from: %s to: %s", indexDir,
+ segmentTarFile);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
return null;
}
File metadataFile = SegmentDirectoryPaths.findMetadataFile(indexDir);
if (metadataFile == null) {
_segmentLogger
- .error("Failed to find file: {} under index directory: {}", V1Constants.MetadataKeys.METADATA_FILE_NAME,
+ .error("Failed to find file: {} under index directory: {}",
+ V1Constants.MetadataKeys.METADATA_FILE_NAME,
indexDir);
return null;
}
File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
if (creationMetaFile == null) {
_segmentLogger
- .error("Failed to find file: {} under index directory: {}", V1Constants.SEGMENT_CREATION_META, indexDir);
+ .error("Failed to find file: {} under index directory: {}",
+ V1Constants.SEGMENT_CREATION_META, indexDir);
return null;
}
Map<String, File> metadataFiles = new HashMap<>();
metadataFiles.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, metadataFile);
metadataFiles.put(V1Constants.SEGMENT_CREATION_META, creationMetaFile);
- return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset, buildTimeMillis,
+ return new SegmentBuildDescriptor(segmentTarFile, metadataFiles, _currentOffset,
+ buildTimeMillis,
waitTimeMillis, segmentSizeBytes);
} else {
- return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis, waitTimeMillis,
+ return new SegmentBuildDescriptor(null, null, _currentOffset, buildTimeMillis,
+ waitTimeMillis,
segmentSizeBytes);
}
} catch (InterruptedException e) {
@@ -904,7 +973,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl, isSplitCommit);
- if (!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
+ if (!commitResponse.getStatus()
+ .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) {
return false;
}
@@ -913,7 +983,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
return true;
}
- protected SegmentCompletionProtocol.Response commit(String controllerVipUrl, boolean isSplitCommit) {
+ protected SegmentCompletionProtocol.Response commit(String controllerVipUrl,
+ boolean isSplitCommit) {
SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
@@ -927,7 +998,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
SegmentCommitter segmentCommitter;
try {
- segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
+ segmentCommitter = _segmentCommitterFactory
+ .createSegmentCommitter(isSplitCommit, params, controllerVipUrl);
} catch (URISyntaxException e) {
_segmentLogger.error("Failed to create a segment committer: ", e);
return SegmentCompletionProtocol.RESP_NOT_SENT;
@@ -971,12 +1043,16 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
/**
* Cleans up the metrics that reflects the state of the realtime segment.
- * This step is essential as the instance may not be the target location for some of the partitions.
- * E.g. if the number of partitions increases, or a host swap is needed, the target location for some partitions
+ * This step is essential as the instance may not be the target location for some of the
+ * partitions.
+ * E.g. if the number of partitions increases, or a host swap is needed, the target location for
+ * some partitions
* may change,
- * and the current host remains to run. In this case, the current server would still keep the state of the old
+ * and the current host remains to run. In this case, the current server would still keep the
+ * state of the old
* partitions,
- * which no longer resides in this host any more, thus causes false positive information to the metric system.
+ * which no longer resides in this host any more, thus causes false positive information to the
+ * metric system.
*/
private void cleanupMetrics() {
_serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING);
@@ -994,10 +1070,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
protected void postStopConsumedMsg(String reason) {
do {
SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
- params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason).withSegmentName(_segmentNameStr)
+ params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason)
+ .withSegmentName(_segmentNameStr)
.withInstanceId(_instanceId);
- SegmentCompletionProtocol.Response response = _protocolHandler.segmentStoppedConsuming(params);
+ SegmentCompletionProtocol.Response response = _protocolHandler
+ .segmentStoppedConsuming(params);
if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
_segmentLogger.info("Got response {}", response.toJsonString());
break;
@@ -1036,7 +1114,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
final StreamPartitionMsgOffset endOffset =
_streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
_segmentLogger
- .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
+ .info(
+ "State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})",
+ _state.toString(),
_startOffset, endOffset);
stop();
_segmentLogger.info("Consumer thread stopped in state {}", _state.toString());
@@ -1058,8 +1138,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
CompletionMode segmentCompletionMode = getSegmentCompletionMode();
switch (segmentCompletionMode) {
case DOWNLOAD:
- _segmentLogger.info("State {}. CompletionMode {}. Downloading to replace", _state.toString(),
- segmentCompletionMode);
+ _segmentLogger
+ .info("State {}. CompletionMode {}. Downloading to replace", _state.toString(),
+ segmentCompletionMode);
downloadSegmentAndReplace(segmentZKMetadata);
break;
case DEFAULT:
@@ -1067,23 +1148,28 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (_currentOffset.compareTo(endOffset) > 0) {
// We moved ahead of the offset that is committed in ZK.
_segmentLogger
- .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", _currentOffset,
+ .warn("Current offset {} ahead of the offset in zk {}. Downloading to replace",
+ _currentOffset,
endOffset);
downloadSegmentAndReplace(segmentZKMetadata);
} else if (_currentOffset.compareTo(endOffset) == 0) {
_segmentLogger
- .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset);
+ .info("Current offset {} matches offset in zk {}. Replacing segment",
+ _currentOffset, endOffset);
buildSegmentAndReplace();
} else {
- _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset, endOffset);
+ _segmentLogger.info("Attempting to catch up from offset {} to {} ", _currentOffset,
+ endOffset);
boolean success = catchupToFinalOffset(endOffset,
- TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
+ TimeUnit.MILLISECONDS
+ .convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, TimeUnit.SECONDS));
if (success) {
_segmentLogger.info("Caught up to offset {}", _currentOffset);
buildSegmentAndReplace();
} else {
_segmentLogger
- .info("Could not catch up to offset (current = {}). Downloading to replace", _currentOffset);
+ .info("Could not catch up to offset (current = {}). Downloading to replace",
+ _currentOffset);
downloadSegmentAndReplace(segmentZKMetadata);
}
}
@@ -1093,7 +1179,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
break;
default:
- _segmentLogger.info("Downloading to replace segment while in state {}", _state.toString());
+ _segmentLogger
+ .info("Downloading to replace segment while in state {}", _state.toString());
downloadSegmentAndReplace(segmentZKMetadata);
break;
}
@@ -1107,7 +1194,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) {
closeStreamConsumers();
_realtimeTableDataManager
- .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig, _tableConfig);
+ .downloadAndReplaceSegment(_segmentNameStr, segmentZKMetadata, _indexLoadingConfig,
+ _tableConfig);
}
protected long now() {
@@ -1130,7 +1218,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
if (_currentOffset.compareTo(endOffset) != 0) {
// Timeout?
- _segmentLogger.error("Could not consume up to {} (current offset {})", endOffset, _currentOffset);
+ _segmentLogger
+ .error("Could not consume up to {} (current offset {})", endOffset, _currentOffset);
return false;
}
@@ -1175,9 +1264,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Assume that this is called only on OFFLINE to CONSUMING transition.
// If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here.
public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
- RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
+ RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir,
+ IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore,
- ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+ ServerMetrics serverMetrics,
+ @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = segmentZKMetadata;
_tableConfig = tableConfig;
@@ -1190,16 +1281,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
_leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
- _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
+ _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics,
+ _tableNameWithType);
String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
// TODO Validate configs
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
_partitionLevelStreamConfig =
- new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig));
+ new PartitionLevelStreamConfig(_tableNameWithType,
+ IngestionConfigUtils.getStreamConfigMap(_tableConfig));
_streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
_streamPartitionMsgOffsetFactory =
- StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
+ StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig)
+ .createStreamMsgOffsetFactory();
_streamTopic = _partitionLevelStreamConfig.getTopicName();
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_llcSegmentName = llcSegmentName;
@@ -1213,26 +1307,31 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
_metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
- _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
+ _segmentLogger = LoggerFactory
+ .getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + _streamTopic;
_memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
- indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
+ indexLoadingConfig.isRealtimeOffHeapAllocation(),
+ indexLoadingConfig.isDirectRealtimeOffHeapAllocation(),
serverMetrics);
List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
if (sortedColumns.isEmpty()) {
- _segmentLogger.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}",
+ _segmentLogger.info(
+ "RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}",
_llcSegmentName);
_sortedColumn = null;
} else {
String firstSortedColumn = sortedColumns.get(0);
if (_schema.hasColumn(firstSortedColumn)) {
- _segmentLogger.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}",
+ _segmentLogger.info(
+ "Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}",
firstSortedColumn, _llcSegmentName);
_sortedColumn = firstSortedColumn;
} else {
_segmentLogger
- .warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.",
+ .warn(
+ "Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.",
firstSortedColumn, _llcSegmentName);
_sortedColumn = null;
}
@@ -1251,7 +1350,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// No dictionary Columns
_noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());
- _varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());
+ _varLengthDictionaryColumns = new ArrayList<>(
+ indexLoadingConfig.getVarLengthDictionaryColumns());
// Read the max number of rows
int segmentMaxRowCount = _partitionLevelStreamConfig.getFlushThresholdRows();
@@ -1274,26 +1374,32 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Start new realtime segment
String consumerDir = realtimeTableDataManager.getConsumerDir();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
- new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr)
+ new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
+ .setSegmentName(_segmentNameStr)
.setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
- .setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
+ .setCapacity(_segmentMaxRowCount)
+ .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
- .setFSTIndexColumns(fstIndexColumns).setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
- .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
+ .setFSTIndexColumns(fstIndexColumns)
+ .setJsonIndexColumns(indexLoadingConfig.getJsonIndexColumns())
+ .setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs())
+ .setSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
- .setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
+ .setAggregateMetrics(indexingConfig.isAggregateMetrics())
+ .setNullHandlingEnabled(_nullHandlingEnabled)
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setHashFunction(tableConfig.getHashFunction())
.setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn());
// Create message decoder
- Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
+ Set<String> fieldsToRead = IngestionUtils
+ .getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
_messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
- _clientId = _streamTopic + "-" + _partitionGroupId;
+ _clientId = _instanceId + "-" + _streamTopic + "-" + _partitionGroupId;
// Create record transformer
_recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
@@ -1315,9 +1421,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
SegmentPartitionConfig segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig();
if (segmentPartitionConfig != null) {
- Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+ Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig
+ .getColumnPartitionMap();
if (columnPartitionMap.size() == 1) {
- Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next();
+ Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator()
+ .next();
String partitionColumn = entry.getKey();
ColumnPartitionConfig columnPartitionConfig = entry.getValue();
String partitionFunctionName = columnPartitionConfig.getFunctionName();
@@ -1342,7 +1450,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentLogger.warn(
"Number of stream partitions: {} does not match number of partitions in the partition config: {}, "
+ "using number of stream " + "partitions", numPartitionGroups, numPartitions);
- _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
+ _serverMetrics
+ .addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH,
+ 1);
numPartitions = numPartitionGroups;
}
} catch (Exception e) {
@@ -1354,10 +1464,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
realtimeSegmentConfigBuilder
- .setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions));
+ .setPartitionFunction(PartitionFunctionFactory
+ .getPartitionFunction(partitionFunctionName, numPartitions));
realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId);
} else {
- _segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
+ _segmentLogger
+ .warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
}
}
@@ -1374,10 +1486,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
.createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
_latestStreamOffsetAtStartupTime = metadataProvider
- .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
+ .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/
+ 5000);
} catch (Exception e) {
- _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
- _partitionGroupId);
+ _segmentLogger
+ .warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}",
+ _clientId,
+ _partitionGroupId);
}
long now = now();
@@ -1394,16 +1509,19 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// cases, we let some minimum consumption happen before we attempt to complete the segment (unless, of course
// the max consumption time has been configured to be less than the minimum time we use in this class).
long minConsumeTimeMillis =
- Math.min(maxConsumeTimeMillis, TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES));
+ Math.min(maxConsumeTimeMillis,
+ TimeUnit.MILLISECONDS.convert(MINIMUM_CONSUME_TIME_MINUTES, TimeUnit.MINUTES));
if (_consumeEndTime - now < minConsumeTimeMillis) {
_consumeEndTime = now + minConsumeTimeMillis;
}
_segmentCommitterFactory =
- new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
+ new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig,
+ indexLoadingConfig, serverMetrics);
_segmentLogger
- .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
+ .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}",
+ _llcSegmentName,
_segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC).toString());
start();
}
@@ -1417,7 +1535,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
_segmentLogger.info("Creating new stream consumer, reason: {}", reason);
_partitionGroupConsumer =
- _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+ _streamConsumerFactory
+ .createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
}
/**
@@ -1441,7 +1560,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Number of rows indexed should be used for DOCUMENT_COUNT metric, and also for segment flush. Whereas,
// Number of rows consumed should be used for consumption metric.
long rowsIndexed = _numRowsIndexed - _lastUpdatedRowsIndexed.get();
- _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
+ _serverMetrics
+ .addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
_lastUpdatedRowsIndexed.set(_numRowsIndexed);
final long now = now();
final int rowsConsumed = _numRowsConsumed - _lastConsumedCount;
@@ -1451,7 +1571,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
|| rowsConsumed >= MSG_COUNT_THRESHOLD_FOR_LOG) {
_segmentLogger.info(
"Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}",
- rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset, _numRowsConsumed,
+ rowsConsumed, (float) (rowsConsumed) * 1000 / (now - prevTime), _currentOffset,
+ _numRowsConsumed,
_numRowsIndexed);
_lastConsumedCount = _numRowsConsumed;
_lastLogTime = now;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index 735c91d..9748945 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,12 +21,15 @@ package org.apache.pinot.spi.stream;
import java.io.Closeable;
import java.util.concurrent.TimeoutException;
-
/**
* Consumer interface for consuming from a partition group of a stream
*/
public interface PartitionGroupConsumer extends Closeable {
+ default void start(StreamPartitionMsgOffset startOffset) {
+
+ }
+
/**
* Fetch messages and offsets from the stream partition group
*
@@ -37,6 +40,16 @@ public interface PartitionGroupConsumer extends Closeable {
* milliseconds
* @return An iterable containing messages fetched from the stream partition and their offsets
*/
- MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, int timeoutMs)
+ MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
+ StreamPartitionMsgOffset endOffset, int timeoutMs)
throws TimeoutException;
+
+ default StreamPartitionMsgOffset commit(
+ final StreamPartitionMsgOffset currentOffset) {
+ return null;
+ }
+
+ default void rollback() {
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org