You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/05 20:04:35 UTC

git commit: SAMZA-168; add exponential backoff to kafka system admin

Repository: incubator-samza
Updated Branches:
  refs/heads/master f5741125a -> 5ae028595


SAMZA-168; add exponential backoff to kafka system admin


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5ae02859
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5ae02859
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5ae02859

Branch: refs/heads/master
Commit: 5ae02859552778193e4247daa65a00414a8708c1
Parents: f574112
Author: Martin Kleppmann <ma...@rapportive.com>
Authored: Wed Mar 5 11:04:27 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Mar 5 11:04:27 2014 -0800

----------------------------------------------------------------------
 .../samza/util/ExponentialSleepStrategy.scala   | 45 ++++++++++++++
 .../util/TestExponentialSleepStrategy.scala     | 57 +++++++++++++++++
 .../apache/samza/system/kafka/BrokerProxy.scala |  4 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  3 +
 .../util/ExponentialThreadSleepStrategy.scala   | 65 --------------------
 .../TestExponentialThreadSleepStrategy.scala    | 47 --------------
 6 files changed, 107 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
new file mode 100644
index 0000000..b3c9263
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+class ExponentialSleepStrategy(
+    backOffMultiplier: Double = 2.0,
+    initialDelayMs: Long = 100,
+    maximumDelayMs: Long = 10000) {
+
+  require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
+  require(initialDelayMs > 0, "initialDelayMs must be positive")
+  require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= initialDelayMs")
+
+  var previousDelay = 0L
+
+  def sleep() = {
+    val nextDelay = getNextDelay(previousDelay)
+    Thread.sleep(nextDelay)
+    previousDelay = nextDelay
+  }
+
+  def getNextDelay(previousDelay: Long): Long = {
+    val nextDelay = (previousDelay * backOffMultiplier).asInstanceOf[Long]
+    math.min(math.max(initialDelayMs, nextDelay), maximumDelayMs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
new file mode 100644
index 0000000..962ca40
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.util.ExponentialSleepStrategy
+
+class TestExponentialSleepStrategy {
+
+  @Test def testGetNextDelayReturnsIncrementalDelay() = {
+    val st = new ExponentialSleepStrategy
+    var nextDelay = st.getNextDelay(0L)
+    assertEquals(nextDelay, 100L)
+    nextDelay = st.getNextDelay(nextDelay)
+    assertEquals(nextDelay, 200L)
+    nextDelay = st.getNextDelay(nextDelay)
+    assertEquals(nextDelay, 400L)
+  }
+
+  @Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached() = {
+    val st = new ExponentialSleepStrategy
+    var nextDelay = st.getNextDelay(6400L)
+    assertEquals(nextDelay, 10000L)
+    nextDelay = st.getNextDelay(nextDelay)
+    assertEquals(nextDelay, 10000L)
+  }
+
+  @Test def testSleepStrategyIsConfigurable() = {
+    val st = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10)
+    var nextDelay = st.getNextDelay(0L)
+    assertEquals(nextDelay, 10L)
+    nextDelay = st.getNextDelay(nextDelay)
+    assertEquals(nextDelay, 30L)
+    nextDelay = st.getNextDelay(nextDelay)
+    assertEquals(nextDelay, 90L)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 0d71582..10855dc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -32,7 +32,7 @@ import java.util.Map.Entry
 import scala.collection.mutable
 import kafka.consumer.ConsumerConfig
 import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
-import org.apache.samza.util.ExponentialThreadSleepStrategy
+import org.apache.samza.util.ExponentialSleepStrategy
 
 /**
  *  Companion object for class JvmMetrics encapsulating various constants
@@ -125,7 +125,7 @@ class BrokerProxy(
   val thread: Thread = new Thread(new Runnable() {
     def run() {
       info("Initialising sleep strategy");
-      val sleepStrategy = new ExponentialThreadSleepStrategy
+      val sleepStrategy = new ExponentialSleepStrategy
       info("Starting thread for BrokerProxy")
 
       while (!Thread.currentThread.isInterrupted) {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 30e9fc3..ad5f2fa 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -25,6 +25,7 @@ import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.util.ExponentialSleepStrategy
 import kafka.api._
 import kafka.consumer.SimpleConsumer
 import kafka.utils.Utils
@@ -122,6 +123,7 @@ class KafkaSystemAdmin(
     var upcomingOffsets = Map[SystemStreamPartition, String]()
     var done = false
     var consumer: SimpleConsumer = null
+    val retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 500)
 
     debug("Fetching offsets for: %s" format streams)
 
@@ -176,6 +178,7 @@ class KafkaSystemAdmin(
           // Retry.
           warn("Unable to fetch last offsets for streams due to: %s, %s. Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, streams))
           debug(e)
+          retryBackoff.sleep
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala
deleted file mode 100644
index fde3cf3..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/util/ExponentialThreadSleepStrategy.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.util;
-
-import ExponentialThreadSleepStrategy._
-
-/**
- *  Companion object for class ExponentialThreadSleep encapsulating various constants
- */
-object ExponentialThreadSleepStrategy {
-  val SLEEP_BACK_OFF_MULTIPLIER = 2.0
-  val SLEEP_INITIAL_BACK_OFF_DELAY_MS = 100
-  val SLEEP_CAP_DELAY_MS = 10000
-}
-
-class ExponentialThreadSleepStrategy {
-
-  /**
-   * Sleep counter variable used to
-   */
-  var sleepBackOffMS = 0l
-
-  val sleepMSBackOffDelay = SLEEP_INITIAL_BACK_OFF_DELAY_MS
-
-  val sleepMSCapOnFetchMessagesError = SLEEP_CAP_DELAY_MS
-
-  def sleep() = {
-    val nextDelay = getNextDelay(sleepBackOffMS)
-    Thread.sleep(nextDelay)
-    sleepBackOffMS = nextDelay
-  }
-
-  def getNextDelay(previousDelay: Long) = {
-    var nextDelay = 0l
-    if (previousDelay == 0) {
-      nextDelay = sleepMSBackOffDelay
-    } else if (SLEEP_BACK_OFF_MULTIPLIER > 1) {
-      nextDelay = (previousDelay * SLEEP_BACK_OFF_MULTIPLIER).asInstanceOf[Long]
-      if (sleepMSCapOnFetchMessagesError != -1 && nextDelay > sleepMSCapOnFetchMessagesError) {
-        nextDelay = scala.math.max(sleepMSCapOnFetchMessagesError, sleepMSBackOffDelay)
-      }
-    } 
-    nextDelay
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ae02859/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala b/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala
deleted file mode 100644
index f5e3e4c..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/util/TestExponentialThreadSleepStrategy.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.samza.util.ExponentialThreadSleepStrategy
-import org.junit.Assert._
-import org.junit.Test
-
-class TestExponentialThreadSleepStrategy {
-
-  @Test def testGetNextRedeliveryDelayCorrectlyReturnsMaximumDelayWhenDelayCapReached() = {
-    val st = new ExponentialThreadSleepStrategy
-    var nextDelay = st.getNextDelay(ExponentialThreadSleepStrategy.SLEEP_CAP_DELAY_MS)
-    assertEquals(nextDelay, 10000l)
-    nextDelay = st.getNextDelay(ExponentialThreadSleepStrategy.SLEEP_CAP_DELAY_MS)
-    assertEquals(nextDelay, 10000l)
-  }
-
-  @Test def testGetNextRedeliveryDelayCorrectlyReturnsIncrementalDelay() = {
-    val st = new ExponentialThreadSleepStrategy
-    var nextDelay = st.getNextDelay(0l)
-    assertEquals(nextDelay, 100l)
-    nextDelay = st.getNextDelay(nextDelay)
-    assertEquals(nextDelay, 200l)
-    nextDelay = st.getNextDelay(nextDelay)
-    assertEquals(nextDelay, 400l)
-  }
-}
-
-