You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/06/18 00:08:37 UTC

[incubator-pinot] branch master updated: Fix idle count bug in realtime consumption (#4327)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ad049f  Fix idle count bug in realtime consumption (#4327)
4ad049f is described below

commit 4ad049faf874b1726806455fda31d7a9d93da207
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jun 17 17:08:32 2019 -0700

    Fix idle count bug in realtime consumption (#4327)
---
 .../core/data/manager/realtime/LLRealtimeSegmentDataManager.java   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..61ed73c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -344,7 +344,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
     long lastUpdatedOffset = _currentOffset;  // so that we always update the metric when we enter this method.
-    long idleCount = 0;
+    long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
     // anymore. Remove the file if it exists.
     removeSegmentFile();
@@ -378,6 +378,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
 
       if (_currentOffset != lastUpdatedOffset) {
+        consecutiveIdleCount = 0;
         // We consumed something. Update the highest stream offset as well as partition-consuming metric.
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset);
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset);
@@ -386,9 +387,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       } else {
         // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long time.
         // Create a new stream consumer wrapper, in case we are stuck on something.
-        if (++idleCount > maxIdleCountBeforeStatUpdate) {
+        if (++consecutiveIdleCount > maxIdleCountBeforeStatUpdate) {
           _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
-          idleCount = 0;
+          consecutiveIdleCount = 0;
           makeStreamConsumer("Idle for too long");
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org