You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/10 19:37:37 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1189] Relax the condition for the increasing ingestion latency check[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9744e  [GOBBLIN-1189] Relax the condition for the increasing ingestion latency check[]
8b9744e is described below

commit 8b9744e21f9bfb7ff2d01391eed28bb0b8799b33
Author: sv2000 <su...@gmail.com>
AuthorDate: Wed Jun 10 12:37:30 2020 -0700

    [GOBBLIN-1189] Relax the condition for the increasing ingestion latency check[]
    
    Closes #3037 from sv2000/latencyHealthCheck
---
 .../source/extractor/extract/kafka/KafkaIngestionHealthCheck.java       | 2 +-
 .../source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
index d7a8346..9737591 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java
@@ -94,7 +94,7 @@ public class KafkaIngestionHealthCheck implements CommitStep {
         return false;
       } else {
         if (this.increasingLatencyCheckEnabled) {
-          if (previousLatency >= ingestionLatency) {
+          if (previousLatency > ingestionLatency) {
             return false;
           }
           previousLatency = ingestionLatency;
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
index ff7049f..3268164 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java
@@ -64,7 +64,7 @@ public class KafkaIngestionHealthCheckTest {
     Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES))
         .thenReturn(6L)
         .thenReturn(7L)
-        .thenReturn(10L)
+        .thenReturn(7L)
         .thenReturn(5L);
     Mockito.when(extractorStatsTracker.getConsumptionRateMBps())
         .thenReturn(2.0)