You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/05 20:04:35 UTC
git commit: SAMZA-168; add exponential backoff to kafka system admin
Repository: incubator-samza
Updated Branches:
refs/heads/master f5741125a -> 5ae028595
SAMZA-168; add exponential backoff to kafka system admin
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5ae02859
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5ae02859
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5ae02859
Branch: refs/heads/master
Commit: 5ae02859552778193e4247daa65a00414a8708c1
Parents: f574112
Author: Martin Kleppmann <ma...@rapportive.com>
Authored: Wed Mar 5 11:04:27 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Mar 5 11:04:27 2014 -0800
----------------------------------------------------------------------
.../samza/util/ExponentialSleepStrategy.scala | 45 ++++++++++++++
.../util/TestExponentialSleepStrategy.scala | 57 +++++++++++++++++
.../apache/samza/system/kafka/BrokerProxy.scala | 4 +-
.../samza/system/kafka/KafkaSystemAdmin.scala | 3 +
.../util/ExponentialThreadSleepStrategy.scala | 65 --------------------
.../TestExponentialThreadSleepStrategy.scala | 47 --------------
6 files changed, 107 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
new file mode 100644
index 0000000..b3c9263
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+class ExponentialSleepStrategy(
+ backOffMultiplier: Double = 2.0,
+ initialDelayMs: Long = 100,
+ maximumDelayMs: Long = 10000) {
+
+ require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
+ require(initialDelayMs > 0, "initialDelayMs must be positive")
+ require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= initialDelayMs")
+
+ var previousDelay = 0L
+
+ def sleep() = {
+ val nextDelay = getNextDelay(previousDelay)
+ Thread.sleep(nextDelay)
+ previousDelay = nextDelay
+ }
+
+ def getNextDelay(previousDelay: Long): Long = {
+ val nextDelay = (previousDelay * backOffMultiplier).asInstanceOf[Long]
+ math.min(math.max(initialDelayMs, nextDelay), maximumDelayMs)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
new file mode 100644
index 0000000..962ca40
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.util.ExponentialSleepStrategy
+
+class TestExponentialSleepStrategy {
+
+ @Test def testGetNextDelayReturnsIncrementalDelay() = {
+ val st = new ExponentialSleepStrategy
+ var nextDelay = st.getNextDelay(0L)
+ assertEquals(nextDelay, 100L)
+ nextDelay = st.getNextDelay(nextDelay)
+ assertEquals(nextDelay, 200L)
+ nextDelay = st.getNextDelay(nextDelay)
+ assertEquals(nextDelay, 400L)
+ }
+
+ @Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached() = {
+ val st = new ExponentialSleepStrategy
+ var nextDelay = st.getNextDelay(6400L)
+ assertEquals(nextDelay, 10000L)
+ nextDelay = st.getNextDelay(nextDelay)
+ assertEquals(nextDelay, 10000L)
+ }
+
+ @Test def testSleepStrategyIsConfigurable() = {
+ val st = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10)
+ var nextDelay = st.getNextDelay(0L)
+ assertEquals(nextDelay, 10L)
+ nextDelay = st.getNextDelay(nextDelay)
+ assertEquals(nextDelay, 30L)
+ nextDelay = st.getNextDelay(nextDelay)
+ assertEquals(nextDelay, 90L)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 0d71582..10855dc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -32,7 +32,7 @@ import java.util.Map.Entry
import scala.collection.mutable
import kafka.consumer.ConsumerConfig
import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
-import org.apache.samza.util.ExponentialThreadSleepStrategy
+import org.apache.samza.util.ExponentialSleepStrategy
/**
* Companion object for class JvmMetrics encapsulating various constants
@@ -125,7 +125,7 @@ class BrokerProxy(
val thread: Thread = new Thread(new Runnable() {
def run() {
info("Initialising sleep strategy");
- val sleepStrategy = new ExponentialThreadSleepStrategy
+ val sleepStrategy = new ExponentialSleepStrategy
info("Starting thread for BrokerProxy")
while (!Thread.currentThread.isInterrupted) {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 30e9fc3..ad5f2fa 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -25,6 +25,7 @@ import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.util.ExponentialSleepStrategy
import kafka.api._
import kafka.consumer.SimpleConsumer
import kafka.utils.Utils
@@ -122,6 +123,7 @@ class KafkaSystemAdmin(
var upcomingOffsets = Map[SystemStreamPartition, String]()
var done = false
var consumer: SimpleConsumer = null
+ val retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 500)
debug("Fetching offsets for: %s" format streams)
@@ -176,6 +178,7 @@ class KafkaSystemAdmin(
// Retry.
warn("Unable to fetch last offsets for streams due to: %s, %s. Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, streams))
debug(e)
+ retryBackoff.sleep
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala
deleted file mode 100644
index fde3cf3..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.util;
-
-import ExponentialThreadSleepStrategy._
-
-/**
- * Companion object for class ExponentialThreadSleep encapsulating various constants
- */
-object ExponentialThreadSleepStrategy {
- val SLEEP_BACK_OFF_MULTIPLIER = 2.0
- val SLEEP_INITIAL_BACK_OFF_DELAY_MS = 100
- val SLEEP_CAP_DELAY_MS = 10000
-}
-
-class ExponentialThreadSleepStrategy {
-
- /**
- * Sleep counter variable used to
- */
- var sleepBackOffMS = 0l
-
- val sleepMSBackOffDelay = SLEEP_INITIAL_BACK_OFF_DELAY_MS
-
- val sleepMSCapOnFetchMessagesError = SLEEP_CAP_DELAY_MS
-
- def sleep() = {
- val nextDelay = getNextDelay(sleepBackOffMS)
- Thread.sleep(nextDelay)
- sleepBackOffMS = nextDelay
- }
-
- def getNextDelay(previousDelay: Long) = {
- var nextDelay = 0l
- if (previousDelay == 0) {
- nextDelay = sleepMSBackOffDelay
- } else if (SLEEP_BACK_OFF_MULTIPLIER > 1) {
- nextDelay = (previousDelay * SLEEP_BACK_OFF_MULTIPLIER).asInstanceOf[Long]
- if (sleepMSCapOnFetchMessagesError != -1 && nextDelay > sleepMSCapOnFetchMessagesError) {
- nextDelay = scala.math.max(sleepMSCapOnFetchMessagesError, sleepMSBackOffDelay)
- }
- }
- nextDelay
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala b/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala
deleted file mode 100644
index f5e3e4c..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.samza.util.ExponentialThreadSleepStrategy
-import org.junit.Assert._
-import org.junit.Test
-
-class TestExponentialThreadSleepStrategy {
-
- @Test def testGetNextRedeliveryDelayCorrectlyReturnsMaximumDelayWhenDelayCapReached() = {
- val st = new ExponentialThreadSleepStrategy
- var nextDelay = st.getNextDelay(ExponentialThreadSleepStrategy.SLEEP_CAP_DELAY_MS)
- assertEquals(nextDelay, 10000l)
- nextDelay = st.getNextDelay(ExponentialThreadSleepStrategy.SLEEP_CAP_DELAY_MS)
- assertEquals(nextDelay, 10000l)
- }
-
- @Test def testGetNextRedeliveryDelayCorrectlyReturnsIncrementalDelay() = {
- val st = new ExponentialThreadSleepStrategy
- var nextDelay = st.getNextDelay(0l)
- assertEquals(nextDelay, 100l)
- nextDelay = st.getNextDelay(nextDelay)
- assertEquals(nextDelay, 200l)
- nextDelay = st.getNextDelay(nextDelay)
- assertEquals(nextDelay, 400l)
- }
-}
-
-