You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/09 02:00:19 UTC

samza git commit: SAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint

Repository: samza
Updated Branches:
  refs/heads/master 89dc18e96 -> 6492826e1


SAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #438 from shanthoosh/SAMZA-1589


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6492826e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6492826e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6492826e

Branch: refs/heads/master
Commit: 6492826e15ffbe701d5a6a0cc4a4b2a06299c718
Parents: 89dc18e
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Mar 8 18:00:15 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu Mar 8 18:00:15 2018 -0800

----------------------------------------------------------------------
 .../samza/checkpoint/kafka/KafkaCheckpointManager.scala     | 9 +++++----
 .../samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 2 +-
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6492826e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 2dd9569..b090136 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.checkpoint.kafka
 
 import java.util.Collections
+import java.util.concurrent.TimeUnit
 
 import com.google.common.base.Preconditions
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
@@ -53,8 +54,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
                              checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde,
                              checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {
 
-  // Retry duration is approximately 83 minutes.
-  var MaxRetriesOnFailure = 50
+  var MaxRetryDurationMs = TimeUnit.MINUTES.toMillis(15);
 
   info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " +
     s"validateCheckpoints:$validateCheckpoint")
@@ -158,6 +158,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
     val envelope = new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
     val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
 
+    val startTime = System.currentTimeMillis()
     retryBackoff.run(
       loop => {
         systemProducer.send(taskName.getTaskName, envelope)
@@ -167,8 +168,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
       },
 
       (exception, loop) => {
-        if (loop.sleepCount >= MaxRetriesOnFailure) {
-          error(s"Exhausted $MaxRetriesOnFailure retries when writing checkpoint: $checkpoint for task: $taskName.")
+        if ((System.currentTimeMillis() - startTime) >= MaxRetryDurationMs) {
+          error(s"Exhausted $MaxRetryDurationMs milliseconds when writing checkpoint: $checkpoint for task: $taskName.")
           throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception)
         } else {
           warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)

http://git-wip-us.apache.org/repos/asf/samza/blob/6492826e/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 5586a1a..03b0d2c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -115,7 +115,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
     val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props)
     val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry)
-    checkPointManager.MaxRetriesOnFailure = 1
+    checkPointManager.MaxRetryDurationMs = 1
 
     checkPointManager.register(taskName)
     checkPointManager.start