You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:41 UTC
[39/50] incubator-gearpump git commit: fix #1972,
backoff retry kafka consuming on exception
fix #1972, backoff retry kafka consuming on exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/9b1085cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/9b1085cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/9b1085cf
Branch: refs/heads/master
Commit: 9b1085cfe04c7d161372b60c499a00e89aaede51
Parents: 1db8606
Author: manuzhang <ow...@gmail.com>
Authored: Fri Feb 19 12:14:36 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Feb 19 21:20:42 2016 +0800
----------------------------------------------------------------------
.../consumer/ExponentialBackoffSleeper.scala | 58 ++++++++++
.../kafka/lib/consumer/FetchThread.scala | 56 ++++++++--
.../kafka/lib/consumer/KafkaConsumer.scala | 2 +
.../streaming/kafka/lib/FetchThreadSpec.scala | 108 -------------------
.../streaming/kafka/lib/KafkaConsumerSpec.scala | 89 ---------------
.../ExponentialBackoffSleeperSpec.scala | 72 +++++++++++++
.../kafka/lib/consumer/FetchThreadSpec.scala | 108 +++++++++++++++++++
.../kafka/lib/consumer/KafkaConsumerSpec.scala | 88 +++++++++++++++
8 files changed, 373 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
new file mode 100644
index 0000000..b821f15
--- /dev/null
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+/**
+ * someone sleeps for exponentially increasing duration each time
+ * until the cap
+ *
+ * @param backOffMultiplier The factor by which the duration increases.
+ * @param initialDurationMs Time in milliseconds for initial sleep.
+ * @param maximumDurationMs Cap up to which we will increase the duration.
+ */
+private[consumer] class ExponentialBackoffSleeper(
+ backOffMultiplier: Double = 2.0,
+ initialDurationMs: Long = 100,
+ maximumDurationMs: Long = 10000) {
+
+ require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
+ require(initialDurationMs > 0, "initialDurationMs must be positive")
+ require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs")
+
+ private var sleepDuration = initialDurationMs
+
+ def reset(): Unit = {
+ sleepDuration = initialDurationMs
+ }
+
+ def sleep(): Unit = {
+ Thread.sleep(sleepDuration)
+ setNextSleepDuration()
+ }
+
+ def getSleepDuration: Long = sleepDuration
+
+ def setNextSleepDuration(): Unit = {
+ val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long]
+ sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
index 932939c..ee53151 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
@@ -34,29 +34,32 @@ object FetchThread {
fetchSleepMS: Long,
startOffsetTime: Long,
consumerConfig: ConsumerConfig): FetchThread = {
- val consumers: Map[TopicAndPartition, KafkaConsumer] = topicAndPartitions.map {
- tp =>
- tp -> KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
- }.toMap
+ val createConsumer = (tp: TopicAndPartition) =>
+ KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
+
val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
- new FetchThread(consumers, incomingQueue, fetchThreshold, fetchSleepMS)
+ new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS)
}
}
/**
* A thread to fetch messages from multiple kafka [[TopicAndPartition]]s and puts them
* onto a queue, which is asynchronously polled by a consumer
- * @param consumers [[KafkaConsumer]]s by kafka [[TopicAndPartition]]s
+ *
+ * @param createConsumer given a [[TopicAndPartition]], create a [[KafkaConsumer]] to connect to it
* @param incomingQueue a queue to buffer incoming messages
* @param fetchThreshold above which thread should stop fetching messages
* @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold
*/
-private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer],
+private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
+ createConsumer: TopicAndPartition => KafkaConsumer,
incomingQueue: LinkedBlockingQueue[KafkaMessage],
fetchThreshold: Int,
fetchSleepMS: Long) extends Thread {
import FetchThread._
+ private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers
+
def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
consumers(tp).setStartOffset(startOffset)
}
@@ -67,10 +70,29 @@ private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer
override def run(): Unit = {
try {
- while (!Thread.currentThread.isInterrupted) {
- val hasMoreMessages = fetchMessage
- if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) {
- Thread.sleep(fetchSleepMS)
+ var nextOffsets = Map.empty[TopicAndPartition, Long]
+ var reset = false
+ val sleeper = new ExponentialBackoffSleeper(
+ backOffMultiplier = 2.0,
+ initialDurationMs = 100L,
+ maximumDurationMs = 10000L)
+ while (!Thread.currentThread().isInterrupted) {
+ try {
+ if (reset) {
+ nextOffsets = consumers.mapValues(_.getNextOffset)
+ resetConsumers(nextOffsets)
+ reset = false
+ }
+ val hasMoreMessages = fetchMessage
+ sleeper.reset()
+ if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) {
+ Thread.sleep(fetchSleepMS)
+ }
+ } catch {
+ case exception: Exception =>
+ LOG.warn(s"resetting consumers due to $exception")
+ reset = true
+ sleeper.sleep()
}
}
} catch {
@@ -99,4 +121,16 @@ private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer
}
}
}
+
+ private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
+ topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
+ }
+
+ private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = {
+ consumers.values.foreach(_.close())
+ consumers = createAllConsumers
+ consumers.foreach { case (tp, consumer) =>
+ consumer.setStartOffset(nextOffsets(tp))
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
index 04b04ef..208a99d 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
@@ -92,6 +92,8 @@ class KafkaConsumer(consumer: SimpleConsumer,
hasNextHelper(iterator, newIterator = false)
}
+ def getNextOffset: Long = nextOffset
+
def close(): Unit = {
consumer.close()
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala
deleted file mode 100644
index af59296..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage, KafkaConsumer}
-import kafka.common.TopicAndPartition
-import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
- val nonNegativeGen = Gen.choose[Int](0, 1000)
- val positiveGen = Gen.choose[Int](1, 1000)
- val startOffsetGen = Gen.choose[Long](0L, 1000L)
- property("FetchThread should set startOffset to iterators") {
- forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
- (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
- val topicAndPartition = mock[TopicAndPartition]
- val consumer = mock[KafkaConsumer]
- val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
- val fetchThread = new FetchThread(Map(topicAndPartition -> consumer),
- incomingQueue, fetchThreshold, fetchSleepMS)
- fetchThread.setStartOffset(topicAndPartition, startOffset)
- verify(consumer).setStartOffset(startOffset)
- }
- }
-
- val topicAndPartitionGen = for {
- topic <- Gen.alphaStr
- partition <- Gen.choose[Int](0, Int.MaxValue)
- } yield TopicAndPartition(topic, partition)
- property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") {
- forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) {
- (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
- startOffset: Long, topicAndPartition: TopicAndPartition) =>
- val message = mock[KafkaMessage]
- val consumer = mock[KafkaConsumer]
- when(consumer.hasNext).thenReturn(true)
- when(consumer.next).thenReturn(message)
- val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
- val fetchThread = new FetchThread(
- Map(topicAndPartition -> consumer),
- incomingQueue, fetchThreshold, fetchSleepMS)
-
- 0.until(messageNum) foreach { _ =>
- fetchThread.fetchMessage
- }
-
- incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold)
- }
- }
-
- property("FetchThread poll should try to retrieve and remove the head of incoming queue") {
- val topicAndPartition = mock[TopicAndPartition]
- val consumer = mock[KafkaConsumer]
- val kafkaMsg = mock[KafkaMessage]
- val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
- incomingQueue.put(kafkaMsg)
- val fetchThread = new FetchThread(Map(topicAndPartition -> consumer), incomingQueue, 0, 0)
- fetchThread.poll shouldBe Some(kafkaMsg)
- fetchThread.poll shouldBe None
- }
-
- val tpAndHasNextGen = for {
- tp <- topicAndPartitionGen
- hasNext <- Gen.oneOf(true, false)
- } yield (tp, hasNext)
- val tpAndHasNextListGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) suchThat (_.size > 0)
- property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") {
- forAll(tpAndHasNextListGen, nonNegativeGen) {
- (tps: List[(TopicAndPartition, Boolean)], fetchSleepMS: Int) =>
- val tpAndIterators = tps.map { case (tp, hasNext) =>
- val consumer = mock[KafkaConsumer]
- val kafkaMsg = mock[KafkaMessage]
- when(consumer.hasNext).thenReturn(hasNext)
- when(consumer.next).thenReturn(kafkaMsg)
- tp -> consumer
- }.toMap
-
- val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
- val fetchThread = new FetchThread(
- tpAndIterators, incomingQueue, tpAndIterators.size + 1, fetchSleepMS)
- fetchThread.fetchMessage shouldBe tps.map(_._2).reduce(_ || _)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala
deleted file mode 100644
index aae545c..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import com.twitter.bijection.Injection
-import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import kafka.api.OffsetRequest
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.{Message, MessageAndOffset}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg)))
- val messageNumGen = Gen.choose[Int](0, 1000)
- val topicAndPartitionGen = for {
- topic <- Gen.alphaStr
- partition <- Gen.choose[Int](0, Int.MaxValue)
- } yield (topic, partition)
-
- property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") {
- forAll(messageGen, messageNumGen, topicAndPartitionGen) {
- (message: Message, num: Int, topicAndPartition: (String, Int)) =>
- val (topic, partition) = topicAndPartition
- val consumer = mock[SimpleConsumer]
- when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
- OffsetRequest.EarliestTime, -1)).thenReturn(0)
- val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator
- val getIterator = (offset: Long) => iterator
- val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
- 0.until(num).foreach { i =>
- kafkaConsumer.hasNext shouldBe true
- val kafkaMessage = kafkaConsumer.next
- kafkaMessage.offset shouldBe i.toLong
- kafkaMessage.key shouldBe None
- }
- kafkaConsumer.hasNext shouldBe false
- }
- }
-
- val startOffsetGen = Gen.choose[Long](1L, 1000L)
- property("KafkaConsumer setStartOffset should reset internal iterator") {
- forAll(topicAndPartitionGen, startOffsetGen) {
- (topicAndPartition: (String, Int), startOffset: Long) =>
- val (topic, partition) = topicAndPartition
- val consumer = mock[SimpleConsumer]
- val getIterator = mock[Long => Iterator[MessageAndOffset]]
- when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
- OffsetRequest.EarliestTime, -1)).thenReturn(0)
- val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
- kafkaConsumer.setStartOffset(startOffset)
- verify(getIterator).apply(startOffset)
- }
- }
-
- property("KafkaConsumer close should close SimpleConsumer") {
- forAll(topicAndPartitionGen) {
- (topicAndPartition: (String, Int)) =>
- val (topic, partition) = topicAndPartition
- val consumer = mock[SimpleConsumer]
- when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
- OffsetRequest.EarliestTime, -1)).thenReturn(0)
- val getIterator = mock[Long => Iterator[MessageAndOffset]]
- val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
- kafkaConsumer.close()
- verify(consumer).close()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
new file mode 100644
index 0000000..a20e575
--- /dev/null
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+import org.scalatest.{WordSpec, Matchers}
+
+class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
+
+ "ExponentialBackOffSleeper" should {
+ "sleep for increasing duration" in {
+ val sleeper = new ExponentialBackoffSleeper(
+ backOffMultiplier = 2.0,
+ initialDurationMs = 100,
+ maximumDurationMs = 10000
+ )
+ sleeper.getSleepDuration shouldBe 100
+ sleeper.setNextSleepDuration()
+ sleeper.getSleepDuration shouldBe 200
+ sleeper.setNextSleepDuration()
+ sleeper.getSleepDuration shouldBe 400
+ sleeper.setNextSleepDuration()
+ sleeper.getSleepDuration shouldBe 800
+ }
+
+ "sleep for no more than maximum duration" in {
+ val sleeper = new ExponentialBackoffSleeper(
+ backOffMultiplier = 2.0,
+ initialDurationMs = 6400,
+ maximumDurationMs = 10000
+ )
+ sleeper.getSleepDuration shouldBe 6400
+ sleeper.setNextSleepDuration()
+ sleeper.getSleepDuration shouldBe 10000
+ sleeper.setNextSleepDuration()
+ sleeper.getSleepDuration shouldBe 10000
+ }
+
+ "sleep for initial duration after reset" in {
+ val sleeper = new ExponentialBackoffSleeper(
+ backOffMultiplier = 2.0,
+ initialDurationMs = 100,
+ maximumDurationMs = 10000
+ )
+ sleeper.getSleepDuration shouldBe 100
+ sleeper.setNextSleepDuration()
+ sleeper.getSleepDuration shouldBe 200
+ sleeper.reset()
+ sleeper.getSleepDuration shouldBe 100
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
new file mode 100644
index 0000000..92d3c31
--- /dev/null
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import kafka.common.TopicAndPartition
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ val nonNegativeGen = Gen.choose[Int](0, 1000)
+ val positiveGen = Gen.choose[Int](1, 1000)
+ val startOffsetGen = Gen.choose[Long](0L, 1000L)
+ property("FetchThread should set startOffset to iterators") {
+ forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
+ (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
+ val topicAndPartition = mock[TopicAndPartition]
+ val consumer = mock[KafkaConsumer]
+ val createConsumer = (tp: TopicAndPartition) => consumer
+ val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+ val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
+ incomingQueue, fetchThreshold, fetchSleepMS)
+ fetchThread.setStartOffset(topicAndPartition, startOffset)
+ verify(consumer).setStartOffset(startOffset)
+ }
+ }
+
+ val topicAndPartitionGen = for {
+ topic <- Gen.alphaStr
+ partition <- Gen.choose[Int](0, Int.MaxValue)
+ } yield TopicAndPartition(topic, partition)
+ property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") {
+ forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) {
+ (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
+ startOffset: Long, topicAndPartition: TopicAndPartition) =>
+ val message = mock[KafkaMessage]
+ val consumer = mock[KafkaConsumer]
+ val createConsumer = (tp: TopicAndPartition) => consumer
+ when(consumer.hasNext).thenReturn(true)
+ when(consumer.next).thenReturn(message)
+ val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+ val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
+ incomingQueue, fetchThreshold, fetchSleepMS)
+
+ 0.until(messageNum) foreach { _ =>
+ fetchThread.fetchMessage
+ }
+
+ incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold)
+ }
+ }
+
+ property("FetchThread poll should try to retrieve and remove the head of incoming queue") {
+ val topicAndPartition = mock[TopicAndPartition]
+ val consumer = mock[KafkaConsumer]
+ val createConsumer = (tp: TopicAndPartition) => consumer
+ val kafkaMsg = mock[KafkaMessage]
+ val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+ incomingQueue.put(kafkaMsg)
+ val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, incomingQueue, 0, 0)
+ fetchThread.poll shouldBe Some(kafkaMsg)
+ fetchThread.poll shouldBe None
+ }
+
+ val tpAndHasNextGen = for {
+ tp <- topicAndPartitionGen
+ hasNext <- Gen.oneOf(true, false)
+ } yield (tp, hasNext)
+ val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen).map(_.toMap) suchThat (_.nonEmpty)
+ property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") {
+ forAll(tpHasNextMapGen, nonNegativeGen) {
+ (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) =>
+ val createConsumer = (tp: TopicAndPartition) => {
+ val consumer = mock[KafkaConsumer]
+ val kafkaMsg = mock[KafkaMessage]
+ val hasNext = tpHasNextMap(tp)
+ when(consumer.hasNext).thenReturn(hasNext)
+ when(consumer.next).thenReturn(kafkaMsg)
+ consumer
+ }
+ val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+ val fetchThread = new FetchThread(tpHasNextMap.keys.toArray,
+ createConsumer, incomingQueue, tpHasNextMap.size + 1, fetchSleepMS)
+ fetchThread.fetchMessage shouldBe tpHasNextMap.values.reduce(_ || _)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
new file mode 100644
index 0000000..2669f60
--- /dev/null
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.kafka.lib.consumer
+
+import com.twitter.bijection.Injection
+import kafka.api.OffsetRequest
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.{Message, MessageAndOffset}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+ val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg)))
+ val messageNumGen = Gen.choose[Int](0, 1000)
+ val topicAndPartitionGen = for {
+ topic <- Gen.alphaStr
+ partition <- Gen.choose[Int](0, Int.MaxValue)
+ } yield (topic, partition)
+
+ property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") {
+ forAll(messageGen, messageNumGen, topicAndPartitionGen) {
+ (message: Message, num: Int, topicAndPartition: (String, Int)) =>
+ val (topic, partition) = topicAndPartition
+ val consumer = mock[SimpleConsumer]
+ when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
+ OffsetRequest.EarliestTime, -1)).thenReturn(0)
+ val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator
+ val getIterator = (offset: Long) => iterator
+ val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
+ 0.until(num).foreach { i =>
+ kafkaConsumer.hasNext shouldBe true
+ val kafkaMessage = kafkaConsumer.next
+ kafkaMessage.offset shouldBe i.toLong
+ kafkaMessage.key shouldBe None
+ }
+ kafkaConsumer.hasNext shouldBe false
+ }
+ }
+
+ val startOffsetGen = Gen.choose[Long](1L, 1000L)
+ property("KafkaConsumer setStartOffset should reset internal iterator") {
+ forAll(topicAndPartitionGen, startOffsetGen) {
+ (topicAndPartition: (String, Int), startOffset: Long) =>
+ val (topic, partition) = topicAndPartition
+ val consumer = mock[SimpleConsumer]
+ val getIterator = mock[Long => Iterator[MessageAndOffset]]
+ when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
+ OffsetRequest.EarliestTime, -1)).thenReturn(0)
+ val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
+ kafkaConsumer.setStartOffset(startOffset)
+ verify(getIterator).apply(startOffset)
+ }
+ }
+
+ property("KafkaConsumer close should close SimpleConsumer") {
+ forAll(topicAndPartitionGen) {
+ (topicAndPartition: (String, Int)) =>
+ val (topic, partition) = topicAndPartition
+ val consumer = mock[SimpleConsumer]
+ when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition),
+ OffsetRequest.EarliestTime, -1)).thenReturn(0)
+ val getIterator = mock[Long => Iterator[MessageAndOffset]]
+ val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator)
+ kafkaConsumer.close()
+ verify(consumer).close()
+ }
+ }
+}