You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:41 UTC

[39/50] incubator-gearpump git commit: fix #1972, backoff retry kafka consuming on exception

fix #1972, backoff retry kafka consuming on exception


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/9b1085cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/9b1085cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/9b1085cf

Branch: refs/heads/master
Commit: 9b1085cfe04c7d161372b60c499a00e89aaede51
Parents: 1db8606
Author: manuzhang <ow...@gmail.com>
Authored: Fri Feb 19 12:14:36 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Feb 19 21:20:42 2016 +0800

----------------------------------------------------------------------
 .../consumer/ExponentialBackoffSleeper.scala    |  58 ++++++++++
 .../kafka/lib/consumer/FetchThread.scala        |  56 ++++++++--
 .../kafka/lib/consumer/KafkaConsumer.scala      |   2 +
 .../streaming/kafka/lib/FetchThreadSpec.scala   | 108 -------------------
 .../streaming/kafka/lib/KafkaConsumerSpec.scala |  89 ---------------
 .../ExponentialBackoffSleeperSpec.scala         |  72 +++++++++++++
 .../kafka/lib/consumer/FetchThreadSpec.scala    | 108 +++++++++++++++++++
 .../kafka/lib/consumer/KafkaConsumerSpec.scala  |  88 +++++++++++++++
 8 files changed, 373 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
new file mode 100644
index 0000000..b821f15
--- /dev/null
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+/**
+ * someone sleeps for exponentially increasing duration each time
+ * until the cap
+ *
+ * @param backOffMultiplier The factor by which the duration increases.
+ * @param initialDurationMs Time in milliseconds for initial sleep.
+ * @param maximumDurationMs Cap up to which we will increase the duration.
+ */
+private[consumer] class ExponentialBackoffSleeper(
+    backOffMultiplier: Double = 2.0,
+    initialDurationMs: Long = 100,
+    maximumDurationMs: Long = 10000) {
+
+  require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
+  require(initialDurationMs > 0, "initialDurationMs must be positive")
+  require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs")
+
+  private var sleepDuration = initialDurationMs
+
+  def reset(): Unit = {
+    sleepDuration = initialDurationMs
+  }
+
+  def sleep(): Unit = {
+    Thread.sleep(sleepDuration)
+    setNextSleepDuration()
+  }
+
+  def getSleepDuration: Long = sleepDuration
+
+  def setNextSleepDuration(): Unit = {
+    val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long]
+    sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
index 932939c..ee53151 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
@@ -34,29 +34,32 @@ object FetchThread {
             fetchSleepMS: Long,
             startOffsetTime: Long,
             consumerConfig: ConsumerConfig): FetchThread = {
-    val consumers: Map[TopicAndPartition, KafkaConsumer] = topicAndPartitions.map {
-      tp =>
-        tp -> KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
-    }.toMap
+    val createConsumer = (tp: TopicAndPartition) =>
+      KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
+
     val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-    new FetchThread(consumers, incomingQueue, fetchThreshold, fetchSleepMS)
+    new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS)
   }
 }
 
 /**
  * A thread to fetch messages from multiple kafka [[TopicAndPartition]]s and puts them
  * onto a queue, which is asynchronously polled by a consumer
- * @param consumers [[KafkaConsumer]]s by kafka [[TopicAndPartition]]s
+ *
+ * @param createConsumer given a [[TopicAndPartition]], create a [[KafkaConsumer]] to connect to it
  * @param incomingQueue a queue to buffer incoming messages
  * @param fetchThreshold above which thread should stop fetching messages
  * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold
  */
-private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer],
+private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
+                                 createConsumer: TopicAndPartition => KafkaConsumer,
                                  incomingQueue: LinkedBlockingQueue[KafkaMessage],
                                  fetchThreshold: Int,
                                  fetchSleepMS: Long) extends Thread {
   import FetchThread._
 
+  private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers
+
   def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
     consumers(tp).setStartOffset(startOffset)
   }
@@ -67,10 +70,29 @@ private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer
 
   override def run(): Unit = {
     try {
-      while (!Thread.currentThread.isInterrupted) {
-        val hasMoreMessages = fetchMessage
-        if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) {
-          Thread.sleep(fetchSleepMS)
+      var nextOffsets = Map.empty[TopicAndPartition, Long]
+      var reset = false
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 100L,
+        maximumDurationMs = 10000L)
+      while (!Thread.currentThread().isInterrupted) {
+        try {
+          if (reset) {
+            nextOffsets = consumers.mapValues(_.getNextOffset)
+            resetConsumers(nextOffsets)
+            reset = false
+          }
+          val hasMoreMessages = fetchMessage
+          sleeper.reset()
+          if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) {
+            Thread.sleep(fetchSleepMS)
+          }
+        } catch {
+          case exception: Exception =>
+            LOG.warn(s"resetting consumers due to $exception")
+            reset = true
+            sleeper.sleep()
         }
       }
     } catch {
@@ -99,4 +121,16 @@ private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer
       }
     }
   }
+
+  private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
+    topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
+  }
+
+  private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = {
+    consumers.values.foreach(_.close())
+    consumers = createAllConsumers
+    consumers.foreach { case (tp, consumer) =>
+      consumer.setStartOffset(nextOffsets(tp))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
index 04b04ef..208a99d 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
@@ -92,6 +92,8 @@ class KafkaConsumer(consumer: SimpleConsumer,
     hasNextHelper(iterator, newIterator = false)
   }
 
+  def getNextOffset: Long = nextOffset
+
   def close(): Unit = {
     consumer.close()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala
deleted file mode 100644
index af59296..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage, KafkaConsumer}
-import kafka.common.TopicAndPartition
-import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  val nonNegativeGen = Gen.choose[Int](0, 1000)
-  val positiveGen = Gen.choose[Int](1, 1000)
-  val startOffsetGen = Gen.choose[Long](0L, 1000L)
-  property("FetchThread should set startOffset to iterators") {
-    forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
-      (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
-      val topicAndPartition = mock[TopicAndPartition]
-      val consumer = mock[KafkaConsumer]
-      val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-      val fetchThread = new FetchThread(Map(topicAndPartition -> consumer),
-        incomingQueue, fetchThreshold, fetchSleepMS)
-      fetchThread.setStartOffset(topicAndPartition, startOffset)
-      verify(consumer).setStartOffset(startOffset)
-    }
-  }
-
-  val topicAndPartitionGen = for {
-    topic <- Gen.alphaStr
-    partition <- Gen.choose[Int](0, Int.MaxValue)
-  } yield TopicAndPartition(topic, partition)
-  property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") {
-    forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) {
-      (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
-       startOffset: Long, topicAndPartition: TopicAndPartition) =>
-        val message = mock[KafkaMessage]
-        val consumer = mock[KafkaConsumer]
-        when(consumer.hasNext).thenReturn(true)
-        when(consumer.next).thenReturn(message)
-        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-        val fetchThread = new FetchThread(
-          Map(topicAndPartition -> consumer),
-          incomingQueue, fetchThreshold, fetchSleepMS)
-
-        0.until(messageNum) foreach { _ =>
-          fetchThread.fetchMessage
-        }
-
-        incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold)
-    }
-  }
-
-  property("FetchThread poll should try to retrieve and remove the head of incoming queue") {
-    val topicAndPartition = mock[TopicAndPartition]
-    val consumer = mock[KafkaConsumer]
-    val kafkaMsg = mock[KafkaMessage]
-    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-    incomingQueue.put(kafkaMsg)
-    val fetchThread = new FetchThread(Map(topicAndPartition -> consumer), incomingQueue, 0, 0)
-    fetchThread.poll shouldBe Some(kafkaMsg)
-    fetchThread.poll shouldBe None
-  }
-
-  val tpAndHasNextGen = for {
-    tp <- topicAndPartitionGen
-    hasNext <- Gen.oneOf(true, false)
-  } yield (tp, hasNext)
-  val tpAndHasNextListGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) suchThat (_.size > 0)
-  property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") {
-    forAll(tpAndHasNextListGen, nonNegativeGen) {
-      (tps: List[(TopicAndPartition, Boolean)], fetchSleepMS: Int) =>
-      val tpAndIterators = tps.map { case (tp, hasNext) =>
-          val consumer = mock[KafkaConsumer]
-          val kafkaMsg = mock[KafkaMessage]
-          when(consumer.hasNext).thenReturn(hasNext)
-          when(consumer.next).thenReturn(kafkaMsg)
-          tp -> consumer
-      }.toMap
-
-      val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-      val fetchThread = new FetchThread(
-        tpAndIterators, incomingQueue, tpAndIterators.size + 1, fetchSleepMS)
-      fetchThread.fetchMessage shouldBe tps.map(_._2).reduce(_ || _)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala
deleted file mode 100644
index aae545c..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import com.twitter.bijection.Injection
-import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import kafka.api.OffsetRequest
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.{Message, MessageAndOffset}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-  val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg)))
-  val messageNumGen = Gen.choose[Int](0, 1000)
-  val topicAndPartitionGen = for {
-    topic <- Gen.alphaStr
-    partition <- Gen.choose[Int](0, Int.MaxValue)
-  } yield (topic, partition)
-
-  property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") {
-    forAll(messageGen, messageNumGen, topicAndPartitionGen) {
-      (message: Message, num: Int, topicAndPartition: (String, Int)) =>
-        val (topic, partition) = topicAndPartition
-        val consumer = mock[SimpleConsumer]
-        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
-          OffsetRequest.EarliestTime, -1)).thenReturn(0)
-        val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator
-        val getIterator = (offset: Long) => iterator
-        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
-        0.until(num).foreach { i =>
-          kafkaConsumer.hasNext shouldBe true
-          val kafkaMessage = kafkaConsumer.next
-          kafkaMessage.offset shouldBe i.toLong
-          kafkaMessage.key shouldBe None
-        }
-        kafkaConsumer.hasNext shouldBe false
-    }
-  }
-
-  val startOffsetGen = Gen.choose[Long](1L, 1000L)
-  property("KafkaConsumer setStartOffset should reset internal iterator") {
-    forAll(topicAndPartitionGen, startOffsetGen) {
-      (topicAndPartition: (String, Int), startOffset: Long) =>
-        val (topic, partition) = topicAndPartition
-        val consumer = mock[SimpleConsumer]
-        val getIterator = mock[Long => Iterator[MessageAndOffset]]
-        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
-          OffsetRequest.EarliestTime, -1)).thenReturn(0)
-        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
-        kafkaConsumer.setStartOffset(startOffset)
-        verify(getIterator).apply(startOffset)
-    }
-  }
-
-  property("KafkaConsumer close should close SimpleConsumer") {
-    forAll(topicAndPartitionGen) {
-      (topicAndPartition: (String, Int)) =>
-        val (topic, partition) = topicAndPartition
-        val consumer = mock[SimpleConsumer]
-        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
-          OffsetRequest.EarliestTime, -1)).thenReturn(0)
-        val getIterator = mock[Long => Iterator[MessageAndOffset]]
-        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
-        kafkaConsumer.close()
-        verify(consumer).close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
new file mode 100644
index 0000000..a20e575
--- /dev/null
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+import org.scalatest.{WordSpec, Matchers}
+
+class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
+
+  "ExponentialBackOffSleeper" should {
+    "sleep for increasing duration" in {
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 100,
+        maximumDurationMs = 10000
+      )
+      sleeper.getSleepDuration shouldBe 100
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 200
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 400
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 800
+    }
+
+    "sleep for no more than maximum duration" in {
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 6400,
+        maximumDurationMs = 10000
+      )
+      sleeper.getSleepDuration shouldBe 6400
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 10000
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 10000
+    }
+
+    "sleep for initial duration after reset" in {
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 100,
+        maximumDurationMs = 10000
+      )
+      sleeper.getSleepDuration shouldBe 100
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 200
+      sleeper.reset()
+      sleeper.getSleepDuration shouldBe 100
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
new file mode 100644
index 0000000..92d3c31
--- /dev/null
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import kafka.common.TopicAndPartition
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  val nonNegativeGen = Gen.choose[Int](0, 1000)
+  val positiveGen = Gen.choose[Int](1, 1000)
+  val startOffsetGen = Gen.choose[Long](0L, 1000L)
+  property("FetchThread should set startOffset to iterators") {
+    forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
+      (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
+      val topicAndPartition = mock[TopicAndPartition]
+      val consumer = mock[KafkaConsumer]
+      val createConsumer =  (tp: TopicAndPartition) => consumer
+      val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+      val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
+        incomingQueue, fetchThreshold, fetchSleepMS)
+      fetchThread.setStartOffset(topicAndPartition, startOffset)
+      verify(consumer).setStartOffset(startOffset)
+    }
+  }
+
+  val topicAndPartitionGen = for {
+    topic <- Gen.alphaStr
+    partition <- Gen.choose[Int](0, Int.MaxValue)
+  } yield TopicAndPartition(topic, partition)
+  property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") {
+    forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) {
+      (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
+       startOffset: Long, topicAndPartition: TopicAndPartition) =>
+        val message = mock[KafkaMessage]
+        val consumer = mock[KafkaConsumer]
+        val createConsumer = (tp: TopicAndPartition) => consumer
+        when(consumer.hasNext).thenReturn(true)
+        when(consumer.next).thenReturn(message)
+        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+        val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
+          incomingQueue, fetchThreshold, fetchSleepMS)
+
+        0.until(messageNum) foreach { _ =>
+          fetchThread.fetchMessage
+        }
+
+        incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold)
+    }
+  }
+
+  property("FetchThread poll should try to retrieve and remove the head of incoming queue") {
+    val topicAndPartition = mock[TopicAndPartition]
+    val consumer = mock[KafkaConsumer]
+    val createConsumer = (tp: TopicAndPartition) => consumer
+    val kafkaMsg = mock[KafkaMessage]
+    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+    incomingQueue.put(kafkaMsg)
+    val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, incomingQueue, 0, 0)
+    fetchThread.poll shouldBe Some(kafkaMsg)
+    fetchThread.poll shouldBe None
+  }
+
+  val tpAndHasNextGen = for {
+    tp <- topicAndPartitionGen
+    hasNext <- Gen.oneOf(true, false)
+  } yield (tp, hasNext)
+  val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen).map(_.toMap) suchThat (_.nonEmpty)
+  property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") {
+    forAll(tpHasNextMapGen, nonNegativeGen) {
+      (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) =>
+        val createConsumer = (tp: TopicAndPartition) => {
+          val consumer = mock[KafkaConsumer]
+          val kafkaMsg = mock[KafkaMessage]
+          val hasNext = tpHasNextMap(tp)
+          when(consumer.hasNext).thenReturn(hasNext)
+          when(consumer.next).thenReturn(kafkaMsg)
+          consumer
+        }
+        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+        val fetchThread = new FetchThread(tpHasNextMap.keys.toArray,
+          createConsumer, incomingQueue, tpHasNextMap.size + 1, fetchSleepMS)
+        fetchThread.fetchMessage shouldBe tpHasNextMap.values.reduce(_ || _)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
new file mode 100644
index 0000000..2669f60
--- /dev/null
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+import com.twitter.bijection.Injection
+import kafka.api.OffsetRequest
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.{Message, MessageAndOffset}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+  val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg)))
+  val messageNumGen = Gen.choose[Int](0, 1000)
+  val topicAndPartitionGen = for {
+    topic <- Gen.alphaStr
+    partition <- Gen.choose[Int](0, Int.MaxValue)
+  } yield (topic, partition)
+
+  property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") {
+    forAll(messageGen, messageNumGen, topicAndPartitionGen) {
+      (message: Message, num: Int, topicAndPartition: (String, Int)) =>
+        val (topic, partition) = topicAndPartition
+        val consumer = mock[SimpleConsumer]
+        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
+          OffsetRequest.EarliestTime, -1)).thenReturn(0)
+        val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator
+        val getIterator = (offset: Long) => iterator
+        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
+        0.until(num).foreach { i =>
+          kafkaConsumer.hasNext shouldBe true
+          val kafkaMessage = kafkaConsumer.next
+          kafkaMessage.offset shouldBe i.toLong
+          kafkaMessage.key shouldBe None
+        }
+        kafkaConsumer.hasNext shouldBe false
+    }
+  }
+
+  val startOffsetGen = Gen.choose[Long](1L, 1000L)
+  property("KafkaConsumer setStartOffset should reset internal iterator") {
+    forAll(topicAndPartitionGen, startOffsetGen) {
+      (topicAndPartition: (String, Int), startOffset: Long) =>
+        val (topic, partition) = topicAndPartition
+        val consumer = mock[SimpleConsumer]
+        val getIterator = mock[Long => Iterator[MessageAndOffset]]
+        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
+          OffsetRequest.EarliestTime, -1)).thenReturn(0)
+        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
+        kafkaConsumer.setStartOffset(startOffset)
+        verify(getIterator).apply(startOffset)
+    }
+  }
+
+  property("KafkaConsumer close should close SimpleConsumer") {
+    forAll(topicAndPartitionGen) {
+      (topicAndPartition: (String, Int)) =>
+        val (topic, partition) = topicAndPartition
+        val consumer = mock[SimpleConsumer]
+        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
+          OffsetRequest.EarliestTime, -1)).thenReturn(0)
+        val getIterator = mock[Long => Iterator[MessageAndOffset]]
+        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
+        kafkaConsumer.close()
+        verify(consumer).close()
+    }
+  }
+}