You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/03/01 23:12:21 UTC
[pinot] branch master updated: Adding table name to create unique Kafka consumer client Ids (#10324)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 697adabc2e Adding table name to create unique Kafka consumer client Ids (#10324)
697adabc2e is described below
commit 697adabc2e80793107fc40907d0ffefad6d907cf
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Wed Mar 1 15:12:14 2023 -0800
Adding table name to create unique Kafka consumer client Ids (#10324)
---
.../realtime/LLRealtimeSegmentDataManager.java | 47 ++++++++++------------
.../spi/stream/PartitionGroupMetadataFetcher.java | 3 +-
2 files changed, 24 insertions(+), 26 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4d82f6953e..28a24ad937 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -231,7 +231,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// modify the permit. This boolean make sure the semaphore gets released only once when the partition group stops
// consuming.
private final AtomicBoolean _acquiredConsumerSemaphore;
- private final String _metricKeyName;
private final ServerMetrics _serverMetrics;
private final BooleanSupplier _isReadyToConsumeData;
private final MutableSegmentImpl _realtimeSegment;
@@ -455,10 +454,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// TODO Issue 5359 Need to find a way to bump metrics without getting actual offset value.
if (_currentOffset instanceof LongMsgOffset) {
// TODO: only LongMsgOffset supplies long offset value.
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
((LongMsgOffset) _currentOffset).getOffset());
}
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 1);
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
} else if (endCriteriaReached) {
// At this point current offset has not moved because processStreamEvents() has exited before processing a
@@ -489,7 +488,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (totalIdleTimeMillis > idleTimeoutMillis) {
// Update the partition-consuming metric only if we have been idling beyond idle timeout.
// Create a new stream consumer wrapper, in case we are stuck on something.
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 1);
recreateStreamConsumer(
String.format("Total idle time: %d ms exceeded idle timeout: %d ms", totalIdleTimeMillis,
idleTimeoutMillis));
@@ -505,7 +504,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
if (_numRowsErrored > 0) {
- _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+ _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
}
return true;
@@ -568,7 +567,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
// decode error
realtimeRowsDroppedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
_numRowsErrored++;
} else {
@@ -584,12 +583,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
if (reusedResult.getSkippedRowCount() > 0) {
realtimeRowsDroppedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_FILTERED,
+ _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FILTERED,
reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
}
if (reusedResult.getIncompleteRowCount() > 0) {
realtimeIncompleteRowsConsumedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
+ _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter);
}
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
@@ -599,7 +598,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_lastRowMetadata = msgMetadata;
_lastConsumedTimestampMs = System.currentTimeMillis();
realtimeRowsConsumedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
+ _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
} catch (Exception e) {
_numRowsErrored++;
@@ -647,20 +646,20 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (_state.shouldConsume()) {
consumeLoop(); // Consume until we reached the end criteria, or we are stopped.
}
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
if (_shouldStop) {
break;
}
if (_state == State.INITIAL_CONSUMING) {
initialConsumptionEnd = now();
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(initialConsumptionEnd - _startTimeMs));
} else if (_state == State.CATCHING_UP) {
catchUpTimeMillis += now() - lastCatchUpStart;
_serverMetrics
- .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
+ .setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
}
@@ -752,7 +751,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_state = State.ERROR;
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
}
@@ -760,7 +759,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (initialConsumptionEnd != 0L) {
_serverMetrics
- .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
+ .setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd));
}
// There is a race condition that the destroy() method can be called which ends up calling stop on the consumer.
@@ -770,7 +769,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// so it is ok not to mark it non-consuming, as the main thread will clean up this metric in destroy() method
// as the final step.
if (!_shouldStop) {
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
}
}
}
@@ -936,9 +935,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir);
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(buildTimeMillis));
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
if (forCommit) {
@@ -1082,7 +1081,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
* which no longer resides in this host any more, thus causes false positive information to the metric system.
*/
private void cleanupMetrics() {
- _serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING);
+ _serverMetrics.removeTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING);
}
protected void hold() {
@@ -1131,7 +1130,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata)
throws InterruptedException {
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
try {
// Remove the segment file before we do anything else.
removeSegmentFile();
@@ -1202,7 +1201,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
} catch (Exception e) {
Utils.rethrowException(e);
} finally {
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
}
}
@@ -1228,7 +1227,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentLogger.warn("Exception when catching up to final offset", e);
return false;
} finally {
- _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
}
if (_currentOffset.compareTo(endOffset) != 0) {
// Timeout?
@@ -1318,7 +1317,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentZKMetadata.getStatus().toString());
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
- _metricKeyName = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId;
+ _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId;
_segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
_memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
@@ -1326,7 +1325,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
serverMetrics);
_rateLimiter = RealtimeConsumptionRateManager.getInstance()
- .createRateLimiter(_partitionLevelStreamConfig, _tableNameWithType, _serverMetrics, _metricKeyName);
+ .createRateLimiter(_partitionLevelStreamConfig, _tableNameWithType, _serverMetrics, _clientId);
List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
String sortedColumn;
@@ -1402,8 +1401,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
_streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
- _clientId = streamTopic + "-" + _partitionGroupId;
-
_transformPipeline = new TransformPipeline(tableConfig, schema);
// Acquire semaphore to create stream consumers
try {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 6ffdbe03cc..69ad7c9ac1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -63,7 +63,8 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
@Override
public Boolean call()
throws Exception {
- String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName;
+ String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-"
+ + _streamConfig.getTableNameWithType() + "-" + _topicName;
try (
StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
_newPartitionGroupMetadataList = streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfig,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org