You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:28 UTC

[18/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
new file mode 100644
index 0000000..339711b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka
+
+import java.util.Properties
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import kafka.common.TopicAndPartition
+import org.slf4j.Logger
+
+import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
+import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil}
+import org.apache.gearpump.streaming.source.DefaultTimeStampFilter
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
+import org.apache.gearpump.streaming.transaction.api._
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object KafkaSource {
+  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
+}
+
+/**
+ * Kafka source connectors that pulls a batch of messages (`kafka.consumer.emit.batch.size`)
+ * from multiple Kafka TopicAndPartition in a round-robin way.
+ *
+ * This is a TimeReplayableSource which is able to replay messages given a start time.
+ * Each kafka message is tagged with a timestamp by
+ * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp)
+ * mapping is stored to a [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]].
+ * On recovery, we could retrieve the previously stored offset from the
+ * [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read
+ * from there.
+ *
+ * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and further filtered by a
+ * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]]
+ * such that obsolete messages are dropped.
+ *
+ * @param config kafka source config
+ * @param offsetStorageFactory factory to build [[OffsetStorage]]
+ * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes
+ * @param timestampFilter filters out message based on timestamp
+ * @param fetchThread fetches messages and puts on a in-memory queue
+ * @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition
+ */
+class KafkaSource(
+    config: KafkaSourceConfig,
+    offsetStorageFactory: OffsetStorageFactory,
+    messageDecoder: MessageDecoder = new DefaultMessageDecoder,
+    timestampFilter: TimeStampFilter = new DefaultTimeStampFilter,
+    private var fetchThread: Option[FetchThread] = None,
+    private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = {
+      Map.empty[TopicAndPartition, KafkaOffsetManager]
+    }) extends TimeReplayableSource {
+  import org.apache.gearpump.streaming.kafka.KafkaSource._
+
+  private var startTime: Option[TimeStamp] = None
+
+  /**
+   * Constructs a Kafka Source by...
+   *
+   * @param topics comma-separated string of topics
+   * @param properties kafka consumer config
+   * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+   *     that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+   *
+   */
+  def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = {
+    this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory)
+  }
+  /**
+   * Constructs a Kafka Source by...
+   *
+   * @param topics comma-separated string of topics
+   * @param properties kafka consumer config
+   * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+   *   that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+   * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes
+   * @param timestampFilter filters out message based on timestamp
+   */
+  def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory,
+      messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = {
+    this(KafkaSourceConfig(properties)
+      .withConsumerTopics(topics), offsetStorageFactory,
+      messageDecoder, timestampFilter)
+  }
+
+  /**
+   * Constructs a Kafka Source by...
+   *
+   * @param topics comma-separated string of topics
+   * @param zkConnect kafka consumer config `zookeeper.connect`
+   * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+   *     that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+   */
+  def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) =
+    this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory)
+
+  /**
+   * Constructs a Kafka Source by...
+   *
+   * @param topics comma-separated string of topics
+   * @param zkConnect kafka consumer config `zookeeper.connect`
+   * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+   *     that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+   * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes
+   * @param timestampFilter filters out message based on timestamp
+   */
+  def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory,
+      messageDecoder: MessageDecoder,
+      timestampFilter: TimeStampFilter) = {
+    this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory,
+      messageDecoder, timestampFilter)
+  }
+
+  LOG.debug(s"assigned ${offsetManagers.keySet}")
+
+  private[kafka] def setStartTime(startTime: Option[TimeStamp]): Unit = {
+    this.startTime = startTime
+    fetchThread.foreach { fetch =>
+      this.startTime.foreach { time =>
+        offsetManagers.foreach { case (tp, offsetManager) =>
+          offsetManager.resolveOffset(time) match {
+            case Success(offset) =>
+              LOG.debug(s"set start offset to $offset for $tp")
+              fetch.setStartOffset(tp, offset)
+            case Failure(StorageEmpty) =>
+              LOG.debug(s"no previous TimeStamp stored")
+            case Failure(e) => throw e
+          }
+        }
+      }
+      fetch.setDaemon(true)
+      fetch.start()
+    }
+  }
+
+  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+    import context.{appId, appName, parallelism, taskId}
+
+    val topics = config.getConsumerTopics
+    val grouper = config.getGrouper
+    val consumerConfig = config.consumerConfig
+    val topicAndPartitions = grouper.group(parallelism, taskId.index,
+      KafkaUtil.getTopicAndPartitions(KafkaUtil.connectZookeeper(consumerConfig)(), topics))
+    this.fetchThread = Some(FetchThread(topicAndPartitions, config.getFetchThreshold,
+      config.getFetchSleepMS, config.getConsumerStartOffset, consumerConfig))
+    this.offsetManagers = topicAndPartitions.map { tp =>
+      val storageTopic = s"app${appId}_${appName}_${tp.topic}_${tp.partition}"
+      val storage = offsetStorageFactory.getOffsetStorage(storageTopic)
+      tp -> new KafkaOffsetManager(storage)
+    }.toMap
+
+    setStartTime(Option(startTime))
+  }
+
+  override def read(): Message = {
+    fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull
+  }
+
+  private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = {
+    val msgOpt = offsetManagers(kafkaMsg.topicAndPartition)
+      .filter(messageDecoder.fromBytes(kafkaMsg.msg) -> kafkaMsg.offset)
+    msgOpt.flatMap { msg =>
+      startTime match {
+        case None =>
+          Some(msg)
+        case Some(time) =>
+          timestampFilter.filter(msg, time)
+      }
+    }
+  }
+
+  override def close(): Unit = {
+    offsetManagers.foreach(_._2.close())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
new file mode 100644
index 0000000..8748999
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka
+
+import java.util.Properties
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+import com.twitter.bijection.Injection
+import kafka.api.OffsetRequest
+import kafka.consumer.ConsumerConfig
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
+import org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
+import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import org.apache.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Factory that builds [[KafkaStorage]]
+ *
+ * @param consumerProps kafka consumer config
+ * @param producerProps kafka producer config
+ */
+class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties)
+  extends OffsetStorageFactory {
+
+  /**
+   * Creates consumer config properties with `zookeeper.connect` set to zkConnect
+   * and producer config properties with `bootstrap.servers` set to bootstrapServers
+   *
+   * @param zkConnect kafka consumer config `zookeeper.connect`
+   * @param bootstrapServers kafka producer config `bootstrap.servers`
+   */
+  def this(zkConnect: String, bootstrapServers: String) =
+    this(KafkaUtil.buildConsumerConfig(zkConnect), KafkaUtil.buildProducerConfig(bootstrapServers))
+
+  override def getOffsetStorage(dir: String): OffsetStorage = {
+    val topic = dir
+    val consumerConfig = new ConsumerConfig(consumerProps)
+    val getConsumer = () => KafkaConsumer(topic, 0, OffsetRequest.EarliestTime, consumerConfig)
+    new KafkaStorage(topic, KafkaUtil.createKafkaProducer[Array[Byte], Array[Byte]](
+      producerProps, new ByteArraySerializer, new ByteArraySerializer),
+      getConsumer(), KafkaUtil.connectZookeeper(consumerConfig)())
+  }
+}
+
+object KafkaStorage {
+  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaStorage])
+}
+
+/**
+ * Stores offset-timestamp mapping to kafka
+ *
+ * @param topic kafka store topic
+ * @param producer kafka producer
+ * @param getConsumer function to get kafka consumer
+ * @param connectZk function to connect zookeeper
+ */
+class KafkaStorage private[kafka](
+    topic: String,
+    producer: KafkaProducer[Array[Byte], Array[Byte]],
+    getConsumer: => KafkaConsumer,
+    connectZk: => ZkClient)
+  extends OffsetStorage {
+
+  private lazy val consumer = getConsumer
+
+  private val dataByTime: List[(TimeStamp, Array[Byte])] = {
+    if (KafkaUtil.topicExists(connectZk, topic)) {
+      load(consumer)
+    } else {
+      List.empty[(TimeStamp, Array[Byte])]
+    }
+  }
+
+  /**
+   * Offsets with timestamp less than `time` have already been processed by the system
+   * so we look up the storage for the first offset with timestamp large equal than `time`
+   * on replay.
+   *
+   * @param time the timestamp to look up for the earliest unprocessed offset
+   * @return the earliest unprocessed offset if `time` is in the range, otherwise failure
+   */
+  override def lookUp(time: TimeStamp): Try[Array[Byte]] = {
+    if (dataByTime.isEmpty) {
+      Failure(StorageEmpty)
+    } else {
+      val min = dataByTime.head
+      val max = dataByTime.last
+      if (time < min._1) {
+        Failure(Underflow(min._2))
+      } else if (time > max._1) {
+        Failure(Overflow(max._2))
+      } else {
+        Success(dataByTime.find(_._1 >= time).get._2)
+      }
+    }
+  }
+
+  override def append(time: TimeStamp, offset: Array[Byte]): Unit = {
+    val message = new ProducerRecord[Array[Byte], Array[Byte]](
+      topic, 0, Injection[Long, Array[Byte]](time), offset)
+    producer.send(message)
+  }
+
+  override def close(): Unit = {
+    producer.close()
+    KafkaUtil.deleteTopic(connectZk, topic)
+  }
+
+  private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, Array[Byte])] = {
+    var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, Array[Byte])]
+    while (consumer.hasNext) {
+      val kafkaMsg = consumer.next
+      kafkaMsg.key.map { k =>
+        Injection.invert[TimeStamp, Array[Byte]](k) match {
+          case Success(time) =>
+            messagesBuilder += (time -> kafkaMsg.msg)
+          case Failure(e) => throw e
+        }
+      } orElse (throw new RuntimeException("offset key should not be null"))
+    }
+    consumer.close()
+    messagesBuilder.result().toList
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
new file mode 100644
index 0000000..5f48b43
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka.dsl
+
+import java.util.Properties
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl
+import org.apache.gearpump.streaming.kafka.KafkaSink
+
+class KafkaDSLSink[T](stream: dsl.Stream[T]) {
+
+  /** Create a Kafka DSL Sink */
+  def writeToKafka(
+      topic: String,
+      bootstrapServers: String,
+      parallism: Int,
+      description: String): dsl.Stream[T] = {
+    stream.sink(new KafkaSink(topic, bootstrapServers), parallism, UserConfig.empty, description)
+  }
+
+  def writeToKafka(
+      topic: String,
+      properties: Properties,
+      parallism: Int,
+      description: String): dsl.Stream[T] = {
+    stream.sink(new KafkaSink(topic, properties), parallism, UserConfig.empty, description)
+  }
+}
+
+object KafkaDSLSink {
+
+  import scala.language.implicitConversions
+
+  implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = {
+    new KafkaDSLSink[T](stream)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
new file mode 100644
index 0000000..0275966
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka.dsl
+
+import java.util.Properties
+
+import org.apache.gearpump.streaming.dsl
+import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.kafka.KafkaSource
+import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig}
+import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
+
+object KafkaDSLUtil {
+  def createStream[T](
+      app: StreamApp,
+      parallelism: Int,
+      description: String,
+      kafkaConfig: KafkaSourceConfig,
+      offsetStorageFactory: OffsetStorageFactory,
+      messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder),
+      parallelism, description)
+  }
+
+  def createStream[T](
+      app: StreamApp,
+      parallelism: Int,
+      description: String,
+      topics: String,
+      zkConnect: String,
+      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory),
+      parallelism, description)
+  }
+
+  def createStream[T](
+      app: StreamApp,
+      parallelism: Int,
+      description: String,
+      topics: String,
+      zkConnect: String,
+      offsetStorageFactory: OffsetStorageFactory,
+      messageDecoder: MessageDecoder,
+      timestampFilter: TimeStampFilter): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory,
+    messageDecoder, timestampFilter), parallelism, description)
+  }
+
+  def createStream[T](
+      app: StreamApp,
+      parallelism: Int,
+      description: String,
+      topics: String,
+      properties: Properties,
+      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory),
+    parallelism, description)
+  }
+
+  def createStream[T](
+      app: StreamApp,
+      topics: String,
+      parallelism: Int,
+      description: String,
+      properties: Properties,
+      offsetStorageFactory: OffsetStorageFactory,
+      messageDecoder: MessageDecoder,
+      timestampFilter: TimeStampFilter): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory,
+    messageDecoder, timestampFilter), parallelism, description)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
new file mode 100644
index 0000000..ea7e8d1
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import scala.util.{Failure, Success}
+
+import com.twitter.bijection.Injection
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.transaction.api.MessageDecoder
+
+class DefaultMessageDecoder extends MessageDecoder {
+  override def fromBytes(bytes: Array[Byte]): Message = {
+    Message(bytes, System.currentTimeMillis())
+  }
+}
+
+class StringMessageDecoder extends MessageDecoder {
+  override def fromBytes(bytes: Array[Byte]): Message = {
+    Injection.invert[String, Array[Byte]](bytes) match {
+      case Success(s) => Message(s, System.currentTimeMillis())
+      case Failure(e) => throw e
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
new file mode 100644
index 0000000..88f509b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import scala.util.{Failure, Success, Try}
+
+import com.twitter.bijection.Injection
+import org.slf4j.Logger
+
+import org.apache.gearpump._
+import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import org.apache.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage}
+import org.apache.gearpump.util.LogUtil
+
+object KafkaOffsetManager {
+  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager])
+}
+
+private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager {
+  import org.apache.gearpump.streaming.kafka.lib.KafkaOffsetManager._
+
+  var maxTime: TimeStamp = 0L
+
+  override def filter(messageAndOffset: (Message, Long)): Option[Message] = {
+    val (message, offset) = messageAndOffset
+    if (message.timestamp > maxTime) {
+      maxTime = message.timestamp
+      storage.append(maxTime, Injection[Long, Array[Byte]](offset))
+    }
+    Some(message)
+  }
+
+  override def resolveOffset(time: TimeStamp): Try[Long] = {
+    storage.lookUp(time) match {
+      case Success(offset) => Injection.invert[Long, Array[Byte]](offset)
+      case Failure(Overflow(max)) =>
+        LOG.warn(s"start time larger than the max stored TimeStamp; set to max offset")
+        Injection.invert[Long, Array[Byte]](max)
+      case Failure(Underflow(min)) =>
+        LOG.warn(s"start time less than the min stored TimeStamp; set to min offset")
+        Injection.invert[Long, Array[Byte]](min)
+      case Failure(StorageEmpty) => Failure(StorageEmpty)
+      case Failure(e) => throw e
+    }
+  }
+
+  override def close(): Unit = {
+    storage.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
new file mode 100644
index 0000000..ade414e
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import java.util.Properties
+
+import kafka.api.OffsetRequest
+import kafka.consumer.ConsumerConfig
+import org.slf4j.Logger
+
+import org.apache.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper}
+import org.apache.gearpump.util.LogUtil
+
+object KafkaSourceConfig {
+
+  val NAME = "kafka_config"
+
+  val ZOOKEEPER_CONNECT = "zookeeper.connect"
+  val GROUP_ID = "group.id"
+  val CONSUMER_START_OFFSET = "kafka.consumer.start.offset"
+  val CONSUMER_TOPICS = "kafka.consumer.topics"
+  val FETCH_THRESHOLD = "kafka.consumer.fetch.threshold"
+  val FETCH_SLEEP_MS = "kafka.consumer.fetch.sleep.ms"
+  val GROUPER_CLASS = "kafka.grouper.class"
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  def apply(consumerProps: Properties): KafkaSourceConfig = new KafkaSourceConfig(consumerProps)
+}
+
+/**
+ * Extends kafka.consumer.ConsumerConfig with specific config needed by
+ * [[org.apache.gearpump.streaming.kafka.KafkaSource]]
+ *
+ * @param consumerProps kafka consumer config
+ */
+class KafkaSourceConfig(val consumerProps: Properties = new Properties)
+  extends java.io.Serializable {
+  import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig._
+
+  if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) {
+    consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181")
+  }
+
+  if (!consumerProps.containsKey(GROUP_ID)) {
+    consumerProps.setProperty(GROUP_ID, "gearpump")
+  }
+
+  def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps)
+
+  /**
+   * Set kafka consumer topics, seperated by comma.
+   *
+   * @param topics comma-separated string
+   * @return new KafkaConfig based on this but with
+   *         [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]]
+   *         set to given value
+   */
+  def withConsumerTopics(topics: String): KafkaSourceConfig = {
+    consumerProps.setProperty(CONSUMER_TOPICS, topics)
+    KafkaSourceConfig(consumerProps)
+  }
+
+  /**
+   * Returns a list of kafka consumer topics
+   */
+  def getConsumerTopics: List[String] = {
+    Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList
+  }
+
+  /**
+   * Sets the sleep interval if there are no more message or message buffer is full.
+   *
+   * Consumer.FetchThread will sleep for a while if no more messages or
+   * the incoming queue size is above the
+   * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
+   *
+   * @param sleepMS sleep interval in milliseconds
+   * @return new KafkaConfig based on this but with
+   *         [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]]
+   *         set to given value
+   */
+  def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = {
+    consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS")
+    KafkaSourceConfig(consumerProps)
+  }
+
+  /**
+   * Gets the sleep interval
+   *
+   * Consumer.FetchThread sleeps for a while if no more messages or
+   * the incoming queue is full (size is bigger than the
+   * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]])
+   *
+   * @return sleep interval in milliseconds
+   */
+  def getFetchSleepMS: Int = {
+    Option(consumerProps.getProperty(FETCH_SLEEP_MS)).getOrElse("100").toInt
+  }
+
+  /**
+   * Sets the batch size we use for one fetch.
+   *
+   * Consumer.FetchThread stops fetching new messages if its incoming queue
+   * size is above the threshold and starts again when the queue size is below it
+   *
+   * @param threshold queue size
+   * @return new KafkaConfig based on this but with
+   *         [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
+   *         set to give value
+   */
+  def withFetchThreshold(threshold: Int): KafkaSourceConfig = {
+    consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold")
+    KafkaSourceConfig(consumerProps)
+  }
+
+  /**
+   * Returns fetch batch size.
+   *
+   * Consumer.FetchThread stops fetching new messages if
+   * its incoming queue size is above the threshold and starts again when the queue size is below it
+   *
+   * @return fetch threshold
+   */
+  def getFetchThreshold: Int = {
+    Option(consumerProps.getProperty(FETCH_THRESHOLD)).getOrElse("10000").toInt
+  }
+
+  /**
+   * Sets [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which
+   * defines how kafka.common.TopicAndPartitions are mapped to source tasks.
+   *
+   * @param className name of the factory class
+   * @return new KafkaConfig based on this but with
+   *         [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]]
+   *         set to given value
+   */
+  def withGrouper(className: String): KafkaSourceConfig = {
+    consumerProps.setProperty(GROUPER_CLASS, className)
+    KafkaSourceConfig(consumerProps)
+  }
+
+  /**
+   * Returns [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which
+   * defines how kafka.common.TopicAndPartitions are mapped to source tasks
+   */
+  def getGrouper: KafkaGrouper = {
+    Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS))
+      .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper]
+  }
+
+  def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = {
+    consumerProps.setProperty(CONSUMER_START_OFFSET, s"$earliestOrLatest")
+    KafkaSourceConfig(consumerProps)
+  }
+
+  def getConsumerStartOffset: Long = {
+    Option(consumerProps.getProperty(CONSUMER_START_OFFSET))
+      .getOrElse(s"${OffsetRequest.EarliestTime}").toLong
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
new file mode 100644
index 0000000..e8cf574
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import java.io.InputStream
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.consumer.ConsumerConfig
+import kafka.utils.{ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
+import org.apache.kafka.common.serialization.Serializer
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+object KafkaUtil {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  def getBroker(connectZk: => ZkClient, topic: String, partition: Int): Broker = {
+    val zkClient = connectZk
+    try {
+      val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+        .getOrElse(throw new RuntimeException(
+          s"leader not available for TopicAndPartition($topic, $partition)"))
+      ZkUtils.getBrokerInfo(zkClient, leader)
+        .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader"))
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+        throw e
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String])
+    : Array[TopicAndPartition] = {
+    val zkClient = connectZk
+    try {
+      ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
+        case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
+      }.toArray
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+        throw e
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  def topicExists(connectZk: => ZkClient, topic: String): Boolean = {
+    val zkClient = connectZk
+    try {
+      AdminUtils.topicExists(zkClient, topic)
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+        throw e
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  /**
+   * create a new kafka topic
+   * return true if topic already exists, and false otherwise
+   */
+  def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int)
+    : Boolean = {
+    val zkClient = connectZk
+    try {
+      if (AdminUtils.topicExists(zkClient, topic)) {
+        LOG.info(s"topic $topic exists")
+        true
+      } else {
+        AdminUtils.createTopic(zkClient, topic, partitions, replicas)
+        LOG.info(s"created topic $topic")
+        false
+      }
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+        throw e
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  def deleteTopic(connectZk: => ZkClient, topic: String): Unit = {
+    val zkClient = connectZk
+    try {
+      AdminUtils.deleteTopic(zkClient, topic)
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+    } finally {
+      zkClient.close()
+    }
+  }
+
+  def connectZookeeper(config: ConsumerConfig): () => ZkClient = {
+    val zookeeperConnect = config.zkConnect
+    val sessionTimeout = config.zkSessionTimeoutMs
+    val connectionTimeout = config.zkConnectionTimeoutMs
+    () => new ZkClient(zookeeperConnect, sessionTimeout, connectionTimeout, ZKStringSerializer)
+  }
+
+  def loadProperties(filename: String): Properties = {
+    val props = new Properties()
+    var propStream: InputStream = null
+    try {
+      propStream = getClass.getClassLoader.getResourceAsStream(filename)
+      props.load(propStream)
+    } catch {
+      case e: Exception =>
+        LOG.error(s"$filename not found")
+    } finally {
+      if (propStream != null) {
+        propStream.close()
+      }
+    }
+    props
+  }
+
+  def createKafkaProducer[K, V](properties: Properties,
+      keySerializer: Serializer[K],
+      valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
+    if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
+      properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
+    }
+    new KafkaProducer[K, V](properties, keySerializer, valueSerializer)
+  }
+
+  def buildProducerConfig(bootstrapServers: String): Properties = {
+    val properties = new Properties()
+    properties.setProperty("bootstrap.servers", bootstrapServers)
+    properties
+  }
+
+  def buildConsumerConfig(zkConnect: String): Properties = {
+    val properties = new Properties()
+    properties.setProperty("zookeeper.connect", zkConnect)
+    properties.setProperty("group.id", "gearpump")
+    properties
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
new file mode 100644
index 0000000..ce17f5a
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+/**
+ * someone sleeps for exponentially increasing duration each time
+ * until the cap
+ *
+ * @param backOffMultiplier The factor by which the duration increases.
+ * @param initialDurationMs Time in milliseconds for initial sleep.
+ * @param maximumDurationMs Cap up to which we will increase the duration.
+ */
+private[consumer] class ExponentialBackoffSleeper(
+    backOffMultiplier: Double = 2.0,
+    initialDurationMs: Long = 100,
+    maximumDurationMs: Long = 10000) {
+
+  require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
+  require(initialDurationMs > 0, "initialDurationMs must be positive")
+  require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs")
+
+  private var sleepDuration = initialDurationMs
+
+  def reset(): Unit = {
+    sleepDuration = initialDurationMs
+  }
+
+  def sleep(): Unit = {
+    Thread.sleep(sleepDuration)
+    setNextSleepDuration()
+  }
+
+  def getSleepDuration: Long = sleepDuration
+
+  def setNextSleepDuration(): Unit = {
+    val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long]
+    sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
new file mode 100644
index 0000000..8550207
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.LinkedBlockingQueue
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.ConsumerConfig
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+object FetchThread {
+  private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
+
+  def apply(topicAndPartitions: Array[TopicAndPartition],
+      fetchThreshold: Int,
+      fetchSleepMS: Long,
+      startOffsetTime: Long,
+      consumerConfig: ConsumerConfig): FetchThread = {
+    val createConsumer = (tp: TopicAndPartition) =>
+      KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
+
+    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+    new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS)
+  }
+}
+
+/**
+ * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them
+ * onto a queue, which is asynchronously polled by a consumer
+ *
+ * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
+ *                       [[org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to
+ *                       connect to it
+ * @param incomingQueue a queue to buffer incoming messages
+ * @param fetchThreshold above which thread should stop fetching messages
+ * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold
+ */
+private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
+    createConsumer: TopicAndPartition => KafkaConsumer,
+    incomingQueue: LinkedBlockingQueue[KafkaMessage],
+    fetchThreshold: Int,
+    fetchSleepMS: Long) extends Thread {
+  import org.apache.gearpump.streaming.kafka.lib.consumer.FetchThread._
+
+  private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers
+
+  def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
+    consumers(tp).setStartOffset(startOffset)
+  }
+
+  def poll: Option[KafkaMessage] = {
+    Option(incomingQueue.poll())
+  }
+
+  override def run(): Unit = {
+    try {
+      var nextOffsets = Map.empty[TopicAndPartition, Long]
+      var reset = false
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 100L,
+        maximumDurationMs = 10000L)
+      while (!Thread.currentThread().isInterrupted) {
+        try {
+          if (reset) {
+            nextOffsets = consumers.mapValues(_.getNextOffset)
+            resetConsumers(nextOffsets)
+            reset = false
+          }
+          val hasMoreMessages = fetchMessage
+          sleeper.reset()
+          if (!hasMoreMessages) {
+            Thread.sleep(fetchSleepMS)
+          }
+        } catch {
+          case exception: Exception =>
+            LOG.warn(s"resetting consumers due to $exception")
+            reset = true
+            sleeper.sleep()
+        }
+      }
+    } catch {
+      case e: InterruptedException => LOG.info("fetch thread got interrupted exception")
+      case e: ClosedByInterruptException => LOG.info("fetch thread closed by interrupt exception")
+    } finally {
+      consumers.values.foreach(_.close())
+    }
+  }
+
+  /**
+   * fetch message from each TopicAndPartition in a round-robin way
+   */
+  def fetchMessage: Boolean = {
+    consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
+      val (_, consumer) = tpAndConsumer
+      if (incomingQueue.size < fetchThreshold) {
+        if (consumer.hasNext) {
+          incomingQueue.put(consumer.next())
+          true
+        } else {
+          hasNext
+        }
+      } else {
+        true
+      }
+    }
+  }
+
+  private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
+    topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
+  }
+
+  private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = {
+    consumers.values.foreach(_.close())
+    consumers = createAllConsumers
+    consumers.foreach { case (tp, consumer) =>
+      consumer.setStartOffset(nextOffsets(tp))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
new file mode 100644
index 0000000..55c327b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.common.ErrorMapping._
+import kafka.common.TopicAndPartition
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.MessageAndOffset
+import kafka.utils.Utils
+
+import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
+
+object KafkaConsumer {
+  def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig)
+    : KafkaConsumer = {
+    val connectZk = KafkaUtil.connectZookeeper(config)
+    val broker = KafkaUtil.getBroker(connectZk(), topic, partition)
+    val soTimeout = config.socketTimeoutMs
+    val soBufferSize = config.socketReceiveBufferBytes
+    val fetchSize = config.fetchMessageMaxBytes
+    val clientId = config.clientId
+    val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, soBufferSize, clientId)
+    val getIterator = (offset: Long) => {
+      val request = new FetchRequestBuilder()
+        .addFetch(topic, partition, offset, fetchSize)
+        .build()
+
+      val response = consumer.fetch(request)
+      response.errorCode(topic, partition) match {
+        case NoError => response.messageSet(topic, partition).iterator
+        case error => throw exceptionFor(error)
+      }
+    }
+    new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime)
+  }
+}
+
+/**
+ * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over
+ * messages from a kafka kafka.common.TopicAndPartition.
+ */
+class KafkaConsumer(consumer: SimpleConsumer,
+    topic: String,
+    partition: Int,
+    getIterator: (Long) => Iterator[MessageAndOffset],
+    startOffsetTime: Long = OffsetRequest.EarliestTime) {
+  private val earliestOffset = consumer
+    .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1)
+  private var nextOffset: Long = earliestOffset
+  private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset)
+
+  def setStartOffset(startOffset: Long): Unit = {
+    nextOffset = startOffset
+    iterator = getIterator(nextOffset)
+  }
+
+  def next(): KafkaMessage = {
+    val mo = iterator.next()
+    val message = mo.message
+
+    nextOffset = mo.nextOffset
+
+    val offset = mo.offset
+    val payload = Utils.readBytes(message.payload)
+    new KafkaMessage(topic, partition, offset, Option(message.key).map(Utils.readBytes), payload)
+  }
+
+  def hasNext: Boolean = {
+    @annotation.tailrec
+    def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): Boolean = {
+      if (iter.hasNext) true
+      else if (newIterator) false
+      else {
+        iterator = getIterator(nextOffset)
+        hasNextHelper(iterator, newIterator = true)
+      }
+    }
+    hasNextHelper(iterator, newIterator = false)
+  }
+
+  def getNextOffset: Long = nextOffset
+
+  def close(): Unit = {
+    consumer.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
new file mode 100644
index 0000000..e0813d9
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+import kafka.common.TopicAndPartition
+
+/**
+ * wrapper over messages from kafka
+ * @param topicAndPartition where message comes from
+ * @param offset message offset on kafka queue
+ * @param key message key, could be None
+ * @param msg message payload
+ */
+case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long,
+    key: Option[Array[Byte]], msg: Array[Byte]) {
+
+  def this(topic: String, partition: Int, offset: Long,
+      key: Option[Array[Byte]], msg: Array[Byte]) = {
+    this(TopicAndPartition(topic, partition), offset, key, msg)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
new file mode 100644
index 0000000..b34bf09
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.grouper
+
+import kafka.common.TopicAndPartition
+
+/**
+ * default grouper groups TopicAndPartitions among StreamProducers by partitions
+ *
+ * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) and
+ * 2 streamProducers (streamProducer0 and streamProducer1)
+ *
+ * streamProducer0 gets (topicA, partition1), (topicB, partition1) and (topicA, partition3)
+ * streamProducer1 gets (topicA, partition2), (topicB, partition2)
+ */
+class KafkaDefaultGrouper extends KafkaGrouper {
+  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+    : Array[TopicAndPartition] = {
+    topicAndPartitions.indices.filter(_ % taskNum == taskIndex)
+      .map(i => topicAndPartitions(i)).toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
new file mode 100644
index 0000000..e2f5203
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.grouper
+
+import kafka.common.TopicAndPartition
+
+/**
+ * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks
+ */
+trait KafkaGrouper {
+  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+    : Array[TopicAndPartition]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
deleted file mode 100644
index 2b00414..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka
-
-import com.twitter.bijection.Injection
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.streaming.MockUtil
-
-class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  val dataGen = for {
-    topic <- Gen.alphaStr
-    key <- Gen.alphaStr
-    msg <- Gen.alphaStr
-  } yield (topic, Injection[String, Array[Byte]](key), Injection[String, Array[Byte]](msg))
-
-  property("KafkaSink write should send producer record") {
-    forAll(dataGen) {
-      (data: (String, Array[Byte], Array[Byte])) =>
-        val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-        val (topic, key, msg) = data
-        val kafkaSink = new KafkaSink(() => producer, topic)
-        kafkaSink.write(Message((key, msg)))
-        verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](
-          r => r.topic == topic && (r.key sameElements key) && (r.value sameElements msg)))
-        kafkaSink.write(Message(msg))
-        verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](
-          r => r.topic() == topic && (r.key == null) && (r.value() sameElements msg)
-        ))
-        kafkaSink.close()
-    }
-  }
-
-  property("KafkaSink close should close kafka producer") {
-    val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-    val kafkaSink = new KafkaSink(() => producer, "topic")
-    kafkaSink.close()
-    verify(producer).close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
deleted file mode 100644
index 7c804f7..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka
-
-import scala.util.{Failure, Success}
-
-import com.twitter.bijection.Injection
-import kafka.common.TopicAndPartition
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
-import io.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig}
-import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
-import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
-
-class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  val startTimeGen = Gen.choose[Long](0L, 1000L)
-  val offsetGen = Gen.choose[Long](0L, 1000L)
-
-  property("KafkaSource open sets consumer to earliest offset") {
-    val topicAndPartition = mock[TopicAndPartition]
-    val fetchThread = mock[FetchThread]
-    val offsetManager = mock[KafkaOffsetManager]
-    val messageDecoder = mock[MessageDecoder]
-    val timestampFilter = mock[TimeStampFilter]
-    val offsetStorageFactory = mock[OffsetStorageFactory]
-    val kafkaConfig = mock[KafkaSourceConfig]
-    val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
-      timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
-
-    kafkaSource.setStartTime(None)
-
-    verify(fetchThread).start()
-    verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong())
-  }
-
-  property("KafkaSource open should not set consumer start offset if offset storage is empty") {
-    forAll(startTimeGen) { (startTime: Long) =>
-      val offsetManager = mock[KafkaOffsetManager]
-      val topicAndPartition = mock[TopicAndPartition]
-      val fetchThread = mock[FetchThread]
-      val messageDecoder = mock[MessageDecoder]
-      val timestampFilter = mock[TimeStampFilter]
-      val offsetStorageFactory = mock[OffsetStorageFactory]
-      val kafkaConfig = mock[KafkaSourceConfig]
-      val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
-        timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
-
-      when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty))
-
-      source.setStartTime(Some(startTime))
-      verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong())
-      verify(fetchThread).start()
-
-      when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException))
-      intercept[RuntimeException] {
-        source.setStartTime(Some(startTime))
-      }
-      source.close()
-    }
-  }
-
-  property("KafkaSource open should set consumer start offset if offset storage is not empty") {
-    forAll(startTimeGen, offsetGen) {
-      (startTime: Long, offset: Long) =>
-        val offsetManager = mock[KafkaOffsetManager]
-        val topicAndPartition = mock[TopicAndPartition]
-        val fetchThread = mock[FetchThread]
-        val messageDecoder = mock[MessageDecoder]
-        val timestampFilter = mock[TimeStampFilter]
-        val offsetStorageFactory = mock[OffsetStorageFactory]
-        val kafkaConfig = mock[KafkaSourceConfig]
-        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
-          timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
-
-        when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset))
-
-        source.setStartTime(Some(startTime))
-        verify(fetchThread).setStartOffset(topicAndPartition, offset)
-        verify(fetchThread).start()
-
-        when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException))
-        intercept[RuntimeException] {
-          source.setStartTime(Some(startTime))
-        }
-        source.close()
-    }
-  }
-
-  property("KafkaSource read should return number of messages in best effort") {
-    val kafkaMsgGen = for {
-      topic <- Gen.alphaStr
-      partition <- Gen.choose[Int](0, 1000)
-      offset <- Gen.choose[Long](0L, 1000L)
-      key = None
-      msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
-    } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg)
-    val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen)
-    forAll(msgQueueGen) {
-      (msgQueue: Array[KafkaMessage]) =>
-        val offsetManager = mock[KafkaOffsetManager]
-        val fetchThread = mock[FetchThread]
-        val messageDecoder = mock[MessageDecoder]
-
-        val timestampFilter = mock[TimeStampFilter]
-        val offsetStorageFactory = mock[OffsetStorageFactory]
-        val kafkaConfig = mock[KafkaSourceConfig]
-        val offsetManagers = msgQueue.map(_.topicAndPartition -> offsetManager).toMap
-
-        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
-          timestampFilter, Some(fetchThread), offsetManagers)
-
-        if (msgQueue.isEmpty) {
-          when(fetchThread.poll).thenReturn(None)
-          source.read() shouldBe null
-        } else {
-          msgQueue.indices.foreach { i =>
-            val message = Message(msgQueue(i).msg)
-            when(fetchThread.poll).thenReturn(Option(msgQueue(i)))
-            when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message)
-            when(offsetManager.filter(anyObject[(Message, Long)])).thenReturn(Some(message))
-            when(timestampFilter.filter(anyObject[Message], anyLong())).thenReturn(Some(message))
-
-            source.read shouldBe message
-          }
-        }
-        source.close()
-    }
-  }
-
-  property("KafkaSource close should close all offset managers") {
-    val offsetManager = mock[KafkaOffsetManager]
-    val topicAndPartition = mock[TopicAndPartition]
-    val fetchThread = mock[FetchThread]
-    val timestampFilter = mock[TimeStampFilter]
-    val messageDecoder = mock[MessageDecoder]
-    val offsetStorageFactory = mock[OffsetStorageFactory]
-    val kafkaConfig = mock[KafkaSourceConfig]
-    val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
-      timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
-    source.close()
-    verify(offsetManager).close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
deleted file mode 100644
index f3b5425..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import com.twitter.bijection.Injection
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
-  property("DefaultMessageDecoder should keep the original bytes data in Message") {
-    val decoder = new DefaultMessageDecoder()
-    forAll(Gen.alphaStr) { (s: String) =>
-      val bytes = Injection[String, Array[Byte]](s)
-      decoder.fromBytes(bytes).msg shouldBe bytes
-    }
-  }
-}
-
-class StringMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
-  property("StringMessageDecoder should decode original bytes data into string") {
-    val decoder = new StringMessageDecoder()
-    forAll(Gen.alphaStr) { (s: String) =>
-      val bytes = Injection[String, Array[Byte]](s)
-      decoder.fromBytes(bytes).msg shouldBe s
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
deleted file mode 100644
index c762b06..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import scala.util.{Failure, Success}
-
-import com.twitter.bijection.Injection
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.streaming.transaction.api.OffsetStorage
-import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
-
-class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  val timeStampGen = Gen.choose[Long](0L, 1000L)
-  val messageGen = for {
-    msg <- Gen.alphaStr
-    time <- timeStampGen
-  } yield Message(msg, time)
-
-  val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex)
-
-  property("KafkaOffsetManager should append offset to storage in monotonically" +
-    " increasing time order") {
-    forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) =>
-      val offsetStorage = mock[OffsetStorage]
-      val offsetManager = new KafkaOffsetManager(offsetStorage)
-      messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) =>
-        val (message, offset) = messageAndOffset
-        offsetManager.filter((message, offset.toLong)) shouldBe Option(message)
-        if (message.timestamp > max) {
-          val newMax = message.timestamp
-          verify(offsetStorage).append(newMax, Injection[Long, Array[Byte]](offset.toLong))
-          newMax
-        } else {
-          verifyZeroInteractions(offsetStorage)
-          max
-        }
-      }
-      offsetManager.close()
-    }
-  }
-
-  val minTimeStampGen = Gen.choose[Long](0L, 500L)
-  val maxTimeStampGen = Gen.choose[Long](500L, 1000L)
-  property("KafkaOffsetManager resolveOffset should " +
-    "report StorageEmpty failure when storage is empty") {
-    forAll(timeStampGen) { (time: Long) =>
-      val offsetStorage = mock[OffsetStorage]
-      val offsetManager = new KafkaOffsetManager(offsetStorage)
-      when(offsetStorage.lookUp(time)).thenReturn(Failure(StorageEmpty))
-      offsetManager.resolveOffset(time) shouldBe Failure(StorageEmpty)
-
-      doThrow(new RuntimeException).when(offsetStorage).lookUp(time)
-      intercept[RuntimeException] {
-        offsetManager.resolveOffset(time)
-      }
-      offsetManager.close()
-    }
-  }
-
-  val offsetGen = Gen.choose[Long](0L, 1000L)
-  property("KafkaOffsetManager resolveOffset should return a valid" +
-    " offset when storage is not empty") {
-    forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) {
-      (time: Long, min: Long, max: Long, offset: Long) =>
-        val offsetStorage = mock[OffsetStorage]
-        val offsetManager = new KafkaOffsetManager(offsetStorage)
-        if (time < min) {
-          when(offsetStorage.lookUp(time)).thenReturn(Failure(
-            Underflow(Injection[Long, Array[Byte]](min))))
-          offsetManager.resolveOffset(time) shouldBe Success(min)
-        } else if (time > max) {
-          when(offsetStorage.lookUp(time)).thenReturn(Failure(
-            Overflow(Injection[Long, Array[Byte]](max))))
-          offsetManager.resolveOffset(time) shouldBe Success(max)
-        } else {
-          when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset)))
-          offsetManager.resolveOffset(time) shouldBe Success(offset)
-        }
-
-        doThrow(new RuntimeException).when(offsetStorage).lookUp(time)
-        intercept[RuntimeException] {
-          offsetManager.resolveOffset(time)
-        }
-        offsetManager.close()
-    }
-  }
-
-  property("KafkaOffsetManager close should close offset storage") {
-    val offsetStorage = mock[OffsetStorage]
-    val offsetManager = new KafkaOffsetManager(offsetStorage)
-    offsetManager.close()
-    verify(offsetStorage).close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
deleted file mode 100644
index af23c12..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-// TODO: Fix the UT failure!
-
-// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-//  val minTimeGen = Gen.choose[Long](1L, 500L)
-//  val maxTimeGen = Gen.choose[Long](500L, 999L)
-//
-//  property("KafkaStorage lookup time should report StorageEmpty if storage is empty") {
-//    forAll { (time: Long, topic: String) =>
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val getConsumer = () => mock[KafkaConsumer]
-//      val connectZk = () => mock[ZkClient]
-//      val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(),
-//        connectZk())
-//      storage.lookUp(time) shouldBe Failure(StorageEmpty)
-//    }
-//  }
-//
-//  property("KafkaStorage lookup time should return data or report failure if storage not empty") {
-//    forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) =>
-//      val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) =>
-//        val offset = index.toLong
-//        time -> offset
-//      }
-//      val timeAndOffsetsMap = timeAndOffsets.toMap
-//      val data = timeAndOffsets.map {
-//        case (time, offset) =>
-//          new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)),
-//            Injection[Long, Array[Byte]](offset))
-//      }.toList
-//
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val consumer = mock[KafkaConsumer]
-//      val getConsumer = () => consumer
-//      val connectZk = () => mock[ZkClient]
-//
-//      val hasNexts = List.fill(data.tail.size)(true) :+ false
-//      when(consumer.hasNext).thenReturn(true, hasNexts:_*)
-//      when(consumer.next).thenReturn(data.head, data.tail:_*)
-//
-//      val storage = new KafkaStorage(topic, topicExists = true, producer,
-//        getConsumer(), connectZk())
-//      forAll(Gen.choose[Long](minTime, maxTime)) {
-//        time =>
-//          storage.lookUp(time) match {
-//            case Success(array) =>
-//              array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time)))
-//            case Failure(e)     => fail("time in range should return Success with value")
-//          }
-//      }
-//
-//      forAll(Gen.choose[Long](0L, minTime - 1)) {
-//        time =>
-//          storage.lookUp(time) match {
-//            case Failure(e) => e shouldBe a [Underflow]
-//              e.asInstanceOf[Underflow].min should equal
-//                (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime)))
-//            case Success(_) => fail("time less than min should return Underflow failure")
-//          }
-//      }
-//
-//      forAll(Gen.choose[Long](maxTime + 1, 1000L)) {
-//        time =>
-//          storage.lookUp(time) match {
-//            case Failure(e) => e shouldBe a [Overflow]
-//              e.asInstanceOf[Overflow].max should equal
-//                (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime)))
-//            case Success(_) => fail("time larger than max should return Overflow failure")
-//          }
-//      }
-//    }
-//  }
-//
-//  property("KafkaStorage append should send data to Kafka") {
-//    forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000),
-//      Gen.alphaStr, Gen.oneOf(true, false)) {
-//      (time: Long, offset: Long, topic: String, topicExists: Boolean) =>
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val getConsumer = () => mock[KafkaConsumer]
-//      val connectZk = () => mock[ZkClient]
-//      val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk())
-//      val offsetBytes = Injection[Long, Array[Byte]](offset)
-//      storage.append(time, offsetBytes)
-//      verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]())
-//    }
-//  }
-//
-//  val topicAndPartitionGen = for {
-//    topic <- Gen.alphaStr
-//    partition <- Gen.choose[Int](0, 100)
-//  } yield TopicAndPartition(topic, partition)
-//  property("KafkaStorage should load data from Kafka") {
-//    val kafkaMsgGen = for {
-//      timestamp <- Gen.choose[Long](1L, 1000L)
-//      offset    <- Gen.choose[Long](0L, 1000L)
-//    } yield (timestamp, Injection[Long, Array[Byte]](offset))
-//    val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen)
-//
-//    val topicExistsGen = Gen.oneOf(true, false)
-//
-//    forAll(topicAndPartitionGen, msgListGen) {
-//      (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) =>
-//        val producer=  mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//        val consumer = mock[KafkaConsumer]
-//        val getConsumer = () => consumer
-//        val connectZk = () => mock[ZkClient]
-//        val kafkaStorage = new KafkaStorage(topicAndPartition.topic,
-//          topicExists = true, producer, getConsumer(), connectZk())
-//          msgList match {
-//            case Nil =>
-//              when(consumer.hasNext).thenReturn(false)
-//            case list =>
-//              val hasNexts = List.fill(list.tail.size)(true) :+ false
-//              val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) =>
-//                KafkaMessage(topicAndPartition, index.toLong,
-//                  Some(Injection[Long, Array[Byte]](timestamp)), bytes)
-//              }
-//              when(consumer.hasNext).thenReturn(true, hasNexts: _*)
-//              when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*)
-//          }
-//          kafkaStorage.load(consumer) shouldBe msgList
-//    }
-//  }
-//
-//  property("KafkaStorage should not get consumer when topic doesn't exist") {
-//    forAll(Gen.alphaStr) { (topic: String) =>
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val getConsumer = mock[() => KafkaConsumer]
-//      val connectZk = () => mock[ZkClient]
-//      val kafkaStorage = new KafkaStorage(topic,
-//        topicExists = false, producer, getConsumer(), connectZk())
-//      verify(getConsumer, never()).apply()
-//      kafkaStorage.close()
-//    }
-//  }
-//
-//  property("KafkaStorage should fail to load invalid KafkaMessage") {
-//    val invalidKafkaMsgGen = for {
-//      tp <- topicAndPartitionGen
-//      offset <- Gen.choose[Long](1L, 1000L)
-//      timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))),
-//        None)
-//      msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
-//    } yield KafkaMessage(tp, offset, timestamp, msg)
-//    forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) =>
-//      val consumer = mock[KafkaConsumer]
-//      val getConsumer = () => consumer
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val connectZk = () => mock[ZkClient]
-//      val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic,
-//        topicExists = true, producer, getConsumer(), connectZk())
-//      when(consumer.hasNext).thenReturn(true, false)
-//      when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg)
-//      Try(kafkaStorage.load(consumer)).isFailure shouldBe true
-//    }
-//  }
-//
-//  property("KafkaStorage close should close kafka producer and delete topic") {
-//    val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//    val getConsumer = () => mock[KafkaConsumer]
-//    val zkClient = mock[ZkClient]
-//    val connectZk = () => zkClient
-//    val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk())
-//    kafkaStorage.close()
-//    verify(producer).close()
-//    verify(zkClient).createPersistent(anyString(), anyString())
-//  }
-// }