You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/03/01 23:12:21 UTC

[pinot] branch master updated: Adding table name to create unique Kafka consumer client Ids (#10324)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 697adabc2e Adding table name to create unique Kafka consumer client Ids (#10324)
697adabc2e is described below

commit 697adabc2e80793107fc40907d0ffefad6d907cf
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Wed Mar 1 15:12:14 2023 -0800

    Adding table name to create unique Kafka consumer client Ids (#10324)
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 47 ++++++++++------------
 .../spi/stream/PartitionGroupMetadataFetcher.java  |  3 +-
 2 files changed, 24 insertions(+), 26 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4d82f6953e..28a24ad937 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -231,7 +231,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // modify the permit. This boolean make sure the semaphore gets released only once when the partition group stops
   // consuming.
   private final AtomicBoolean _acquiredConsumerSemaphore;
-  private final String _metricKeyName;
   private final ServerMetrics _serverMetrics;
   private final BooleanSupplier _isReadyToConsumeData;
   private final MutableSegmentImpl _realtimeSegment;
@@ -455,10 +454,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // TODO Issue 5359 Need to find a way to bump metrics without getting actual offset value.
         if (_currentOffset instanceof LongMsgOffset) {
           // TODO: only LongMsgOffset supplies long offset value.
-          _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+          _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
               ((LongMsgOffset) _currentOffset).getOffset());
         }
-        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
+        _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 1);
         lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
       } else if (endCriteriaReached) {
         // At this point current offset has not moved because processStreamEvents() has exited before processing a
@@ -489,7 +488,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           if (totalIdleTimeMillis > idleTimeoutMillis) {
             // Update the partition-consuming metric only if we have been idling beyond idle timeout.
             // Create a new stream consumer wrapper, in case we are stuck on something.
-            _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
+            _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 1);
             recreateStreamConsumer(
                 String.format("Total idle time: %d ms exceeded idle timeout: %d ms", totalIdleTimeMillis,
                     idleTimeoutMillis));
@@ -505,7 +504,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
 
     if (_numRowsErrored > 0) {
-      _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
       _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
     }
     return true;
@@ -568,7 +567,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
         // decode error
         realtimeRowsDroppedMeter =
-            _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+            _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
                 realtimeRowsDroppedMeter);
         _numRowsErrored++;
       } else {
@@ -584,12 +583,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         }
         if (reusedResult.getSkippedRowCount() > 0) {
           realtimeRowsDroppedMeter =
-              _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_FILTERED,
+              _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FILTERED,
                   reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
         }
         if (reusedResult.getIncompleteRowCount() > 0) {
           realtimeIncompleteRowsConsumedMeter =
-              _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
+              _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
                   reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter);
         }
         for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
@@ -599,7 +598,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             _lastRowMetadata = msgMetadata;
             _lastConsumedTimestampMs = System.currentTimeMillis();
             realtimeRowsConsumedMeter =
-                _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
+                _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
                     realtimeRowsConsumedMeter);
           } catch (Exception e) {
             _numRowsErrored++;
@@ -647,20 +646,20 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           if (_state.shouldConsume()) {
             consumeLoop();  // Consume until we reached the end criteria, or we are stopped.
           }
-          _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+          _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
           if (_shouldStop) {
             break;
           }
 
           if (_state == State.INITIAL_CONSUMING) {
             initialConsumptionEnd = now();
-            _serverMetrics.setValueOfTableGauge(_metricKeyName,
+            _serverMetrics.setValueOfTableGauge(_clientId,
                 ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS,
                 TimeUnit.MILLISECONDS.toSeconds(initialConsumptionEnd - _startTimeMs));
           } else if (_state == State.CATCHING_UP) {
             catchUpTimeMillis += now() - lastCatchUpStart;
             _serverMetrics
-                .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
+                .setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
                     TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
           }
 
@@ -752,7 +751,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         _state = State.ERROR;
         _realtimeTableDataManager
             .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
-        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+        _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
         return;
       }
 
@@ -760,7 +759,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
       if (initialConsumptionEnd != 0L) {
         _serverMetrics
-            .setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
+            .setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
                 TimeUnit.MILLISECONDS.toSeconds(now() - initialConsumptionEnd));
       }
       // There is a race condition that the destroy() method can be called which ends up calling stop on the consumer.
@@ -770,7 +769,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // so it is ok not to mark it non-consuming, as the main thread will clean up this metric in destroy() method
       // as the final step.
       if (!_shouldStop) {
-        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+        _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
       }
     }
   }
@@ -936,9 +935,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       }
 
       long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir);
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
+      _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
           TimeUnit.MILLISECONDS.toSeconds(buildTimeMillis));
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
+      _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
           TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
 
       if (forCommit) {
@@ -1082,7 +1081,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
    * which no longer resides in this host any more, thus causes false positive information to the metric system.
    */
   private void cleanupMetrics() {
-    _serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING);
+    _serverMetrics.removeTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING);
   }
 
   protected void hold() {
@@ -1131,7 +1130,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata)
       throws InterruptedException {
-    _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+    _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
     try {
       // Remove the segment file before we do anything else.
       removeSegmentFile();
@@ -1202,7 +1201,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     } catch (Exception e) {
       Utils.rethrowException(e);
     } finally {
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+      _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
     }
   }
 
@@ -1228,7 +1227,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       _segmentLogger.warn("Exception when catching up to final offset", e);
       return false;
     } finally {
-      _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
+      _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
     }
     if (_currentOffset.compareTo(endOffset) != 0) {
       // Timeout?
@@ -1318,7 +1317,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
-    _metricKeyName = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId;
+    _clientId = _tableNameWithType + "-" + streamTopic + "-" + _partitionGroupId;
     _segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
     _tableStreamName = _tableNameWithType + "_" + streamTopic;
     _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
@@ -1326,7 +1325,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         serverMetrics);
 
     _rateLimiter = RealtimeConsumptionRateManager.getInstance()
-        .createRateLimiter(_partitionLevelStreamConfig, _tableNameWithType, _serverMetrics, _metricKeyName);
+        .createRateLimiter(_partitionLevelStreamConfig, _tableNameWithType, _serverMetrics, _clientId);
 
     List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
     String sortedColumn;
@@ -1402,8 +1401,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
     StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
     _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
-    _clientId = streamTopic + "-" + _partitionGroupId;
-
     _transformPipeline = new TransformPipeline(tableConfig, schema);
     // Acquire semaphore to create stream consumers
     try {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 6ffdbe03cc..69ad7c9ac1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -63,7 +63,8 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
   @Override
   public Boolean call()
       throws Exception {
-    String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName;
+    String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-"
+            + _streamConfig.getTableNameWithType() + "-" + _topicName;
     try (
         StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
       _newPartitionGroupMetadataList = streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfig,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org