You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/06/18 00:08:37 UTC
[incubator-pinot] branch master updated: Fix idle count bug in
realtime consumption (#4327)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad049f Fix idle count bug in realtime consumption (#4327)
4ad049f is described below
commit 4ad049faf874b1726806455fda31d7a9d93da207
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jun 17 17:08:32 2019 -0700
Fix idle count bug in realtime consumption (#4327)
---
.../core/data/manager/realtime/LLRealtimeSegmentDataManager.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..61ed73c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -344,7 +344,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
.getFetchTimeoutMillis()); // 3 minute count
long lastUpdatedOffset = _currentOffset; // so that we always update the metric when we enter this method.
- long idleCount = 0;
+ long consecutiveIdleCount = 0;
// At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
// anymore. Remove the file if it exists.
removeSegmentFile();
@@ -378,6 +378,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
if (_currentOffset != lastUpdatedOffset) {
+ consecutiveIdleCount = 0;
// We consumed something. Update the highest stream offset as well as partition-consuming metric.
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset);
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset);
@@ -386,9 +387,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
} else {
// We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long time.
// Create a new stream consumer wrapper, in case we are stuck on something.
- if (++idleCount > maxIdleCountBeforeStatUpdate) {
+ if (++consecutiveIdleCount > maxIdleCountBeforeStatUpdate) {
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
- idleCount = 0;
+ consecutiveIdleCount = 0;
makeStreamConsumer("Idle for too long");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org