You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/09 02:00:19 UTC
samza git commit: SAMZA-1589: Reduce failure retry duration in
KafkaCheckpointManager.writeCheckpoint
Repository: samza
Updated Branches:
refs/heads/master 89dc18e96 -> 6492826e1
SAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #438 from shanthoosh/SAMZA-1589
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6492826e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6492826e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6492826e
Branch: refs/heads/master
Commit: 6492826e15ffbe701d5a6a0cc4a4b2a06299c718
Parents: 89dc18e
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Mar 8 18:00:15 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu Mar 8 18:00:15 2018 -0800
----------------------------------------------------------------------
.../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 9 +++++----
.../samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 2 +-
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6492826e/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 2dd9569..b090136 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -20,6 +20,7 @@
package org.apache.samza.checkpoint.kafka
import java.util.Collections
+import java.util.concurrent.TimeUnit
import com.google.common.base.Preconditions
import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
@@ -53,8 +54,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde,
checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {
- // Retry duration is approximately 83 minutes.
- var MaxRetriesOnFailure = 50
+ var MaxRetryDurationMs = TimeUnit.MINUTES.toMillis(15);
info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " +
s"validateCheckpoints:$validateCheckpoint")
@@ -158,6 +158,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
val envelope = new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+ val startTime = System.currentTimeMillis()
retryBackoff.run(
loop => {
systemProducer.send(taskName.getTaskName, envelope)
@@ -167,8 +168,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
},
(exception, loop) => {
- if (loop.sleepCount >= MaxRetriesOnFailure) {
- error(s"Exhausted $MaxRetriesOnFailure retries when writing checkpoint: $checkpoint for task: $taskName.")
+ if ((System.currentTimeMillis() - startTime) >= MaxRetryDurationMs) {
+ error(s"Exhausted $MaxRetryDurationMs milliseconds when writing checkpoint: $checkpoint for task: $taskName.")
throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception)
} else {
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
http://git-wip-us.apache.org/repos/asf/samza/blob/6492826e/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 5586a1a..03b0d2c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -115,7 +115,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props)
val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry)
- checkPointManager.MaxRetriesOnFailure = 1
+ checkPointManager.MaxRetryDurationMs = 1
checkPointManager.register(taskName)
checkPointManager.start