You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:28 UTC
[18/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
new file mode 100644
index 0000000..339711b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka
+
+import java.util.Properties
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import kafka.common.TopicAndPartition
+import org.slf4j.Logger
+
+import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
+import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil}
+import org.apache.gearpump.streaming.source.DefaultTimeStampFilter
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
+import org.apache.gearpump.streaming.transaction.api._
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object KafkaSource {
+ private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
+}
+
+/**
+ * Kafka source connectors that pulls a batch of messages (`kafka.consumer.emit.batch.size`)
+ * from multiple Kafka TopicAndPartition in a round-robin way.
+ *
+ * This is a TimeReplayableSource which is able to replay messages given a start time.
+ * Each kafka message is tagged with a timestamp by
+ * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp)
+ * mapping is stored to a [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]].
+ * On recovery, we could retrieve the previously stored offset from the
+ * [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read
+ * from there.
+ *
+ * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and further filtered by a
+ * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]]
+ * such that obsolete messages are dropped.
+ *
+ * @param config kafka source config
+ * @param offsetStorageFactory factory to build [[OffsetStorage]]
+ * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes
+ * @param timestampFilter filters out message based on timestamp
+ * @param fetchThread fetches messages and puts on a in-memory queue
+ * @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition
+ */
+class KafkaSource(
+ config: KafkaSourceConfig,
+ offsetStorageFactory: OffsetStorageFactory,
+ messageDecoder: MessageDecoder = new DefaultMessageDecoder,
+ timestampFilter: TimeStampFilter = new DefaultTimeStampFilter,
+ private var fetchThread: Option[FetchThread] = None,
+ private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = {
+ Map.empty[TopicAndPartition, KafkaOffsetManager]
+ }) extends TimeReplayableSource {
+ import org.apache.gearpump.streaming.kafka.KafkaSource._
+
+ private var startTime: Option[TimeStamp] = None
+
+ /**
+ * Constructs a Kafka Source by...
+ *
+ * @param topics comma-separated string of topics
+ * @param properties kafka consumer config
+ * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+ * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+ *
+ */
+ def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = {
+ this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory)
+ }
+ /**
+ * Constructs a Kafka Source by...
+ *
+ * @param topics comma-separated string of topics
+ * @param properties kafka consumer config
+ * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+ * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+ * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes
+ * @param timestampFilter filters out message based on timestamp
+ */
+ def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory,
+ messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = {
+ this(KafkaSourceConfig(properties)
+ .withConsumerTopics(topics), offsetStorageFactory,
+ messageDecoder, timestampFilter)
+ }
+
+ /**
+ * Constructs a Kafka Source by...
+ *
+ * @param topics comma-separated string of topics
+ * @param zkConnect kafka consumer config `zookeeper.connect`
+ * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+ * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+ */
+ def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) =
+ this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory)
+
+ /**
+ * Constructs a Kafka Source by...
+ *
+ * @param topics comma-separated string of topics
+ * @param zkConnect kafka consumer config `zookeeper.connect`
+ * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
+ * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
+ * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes
+ * @param timestampFilter filters out message based on timestamp
+ */
+ def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory,
+ messageDecoder: MessageDecoder,
+ timestampFilter: TimeStampFilter) = {
+ this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory,
+ messageDecoder, timestampFilter)
+ }
+
+ LOG.debug(s"assigned ${offsetManagers.keySet}")
+
+ private[kafka] def setStartTime(startTime: Option[TimeStamp]): Unit = {
+ this.startTime = startTime
+ fetchThread.foreach { fetch =>
+ this.startTime.foreach { time =>
+ offsetManagers.foreach { case (tp, offsetManager) =>
+ offsetManager.resolveOffset(time) match {
+ case Success(offset) =>
+ LOG.debug(s"set start offset to $offset for $tp")
+ fetch.setStartOffset(tp, offset)
+ case Failure(StorageEmpty) =>
+ LOG.debug(s"no previous TimeStamp stored")
+ case Failure(e) => throw e
+ }
+ }
+ }
+ fetch.setDaemon(true)
+ fetch.start()
+ }
+ }
+
+ override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+ import context.{appId, appName, parallelism, taskId}
+
+ val topics = config.getConsumerTopics
+ val grouper = config.getGrouper
+ val consumerConfig = config.consumerConfig
+ val topicAndPartitions = grouper.group(parallelism, taskId.index,
+ KafkaUtil.getTopicAndPartitions(KafkaUtil.connectZookeeper(consumerConfig)(), topics))
+ this.fetchThread = Some(FetchThread(topicAndPartitions, config.getFetchThreshold,
+ config.getFetchSleepMS, config.getConsumerStartOffset, consumerConfig))
+ this.offsetManagers = topicAndPartitions.map { tp =>
+ val storageTopic = s"app${appId}_${appName}_${tp.topic}_${tp.partition}"
+ val storage = offsetStorageFactory.getOffsetStorage(storageTopic)
+ tp -> new KafkaOffsetManager(storage)
+ }.toMap
+
+ setStartTime(Option(startTime))
+ }
+
+ override def read(): Message = {
+ fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull
+ }
+
+ private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = {
+ val msgOpt = offsetManagers(kafkaMsg.topicAndPartition)
+ .filter(messageDecoder.fromBytes(kafkaMsg.msg) -> kafkaMsg.offset)
+ msgOpt.flatMap { msg =>
+ startTime match {
+ case None =>
+ Some(msg)
+ case Some(time) =>
+ timestampFilter.filter(msg, time)
+ }
+ }
+ }
+
+ override def close(): Unit = {
+ offsetManagers.foreach(_._2.close())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
new file mode 100644
index 0000000..8748999
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka
+
+import java.util.Properties
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+import com.twitter.bijection.Injection
+import kafka.api.OffsetRequest
+import kafka.consumer.ConsumerConfig
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
+import org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
+import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import org.apache.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Factory that builds [[KafkaStorage]]
+ *
+ * @param consumerProps kafka consumer config
+ * @param producerProps kafka producer config
+ */
+class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties)
+ extends OffsetStorageFactory {
+
+ /**
+ * Creates consumer config properties with `zookeeper.connect` set to zkConnect
+ * and producer config properties with `bootstrap.servers` set to bootstrapServers
+ *
+ * @param zkConnect kafka consumer config `zookeeper.connect`
+ * @param bootstrapServers kafka producer config `bootstrap.servers`
+ */
+ def this(zkConnect: String, bootstrapServers: String) =
+ this(KafkaUtil.buildConsumerConfig(zkConnect), KafkaUtil.buildProducerConfig(bootstrapServers))
+
+ override def getOffsetStorage(dir: String): OffsetStorage = {
+ val topic = dir
+ val consumerConfig = new ConsumerConfig(consumerProps)
+ val getConsumer = () => KafkaConsumer(topic, 0, OffsetRequest.EarliestTime, consumerConfig)
+ new KafkaStorage(topic, KafkaUtil.createKafkaProducer[Array[Byte], Array[Byte]](
+ producerProps, new ByteArraySerializer, new ByteArraySerializer),
+ getConsumer(), KafkaUtil.connectZookeeper(consumerConfig)())
+ }
+}
+
+object KafkaStorage {
+ private val LOG: Logger = LogUtil.getLogger(classOf[KafkaStorage])
+}
+
+/**
+ * Stores offset-timestamp mapping to kafka
+ *
+ * @param topic kafka store topic
+ * @param producer kafka producer
+ * @param getConsumer function to get kafka consumer
+ * @param connectZk function to connect zookeeper
+ */
+class KafkaStorage private[kafka](
+ topic: String,
+ producer: KafkaProducer[Array[Byte], Array[Byte]],
+ getConsumer: => KafkaConsumer,
+ connectZk: => ZkClient)
+ extends OffsetStorage {
+
+ private lazy val consumer = getConsumer
+
+ private val dataByTime: List[(TimeStamp, Array[Byte])] = {
+ if (KafkaUtil.topicExists(connectZk, topic)) {
+ load(consumer)
+ } else {
+ List.empty[(TimeStamp, Array[Byte])]
+ }
+ }
+
+ /**
+ * Offsets with timestamp less than `time` have already been processed by the system
+ * so we look up the storage for the first offset with timestamp large equal than `time`
+ * on replay.
+ *
+ * @param time the timestamp to look up for the earliest unprocessed offset
+ * @return the earliest unprocessed offset if `time` is in the range, otherwise failure
+ */
+ override def lookUp(time: TimeStamp): Try[Array[Byte]] = {
+ if (dataByTime.isEmpty) {
+ Failure(StorageEmpty)
+ } else {
+ val min = dataByTime.head
+ val max = dataByTime.last
+ if (time < min._1) {
+ Failure(Underflow(min._2))
+ } else if (time > max._1) {
+ Failure(Overflow(max._2))
+ } else {
+ Success(dataByTime.find(_._1 >= time).get._2)
+ }
+ }
+ }
+
+ override def append(time: TimeStamp, offset: Array[Byte]): Unit = {
+ val message = new ProducerRecord[Array[Byte], Array[Byte]](
+ topic, 0, Injection[Long, Array[Byte]](time), offset)
+ producer.send(message)
+ }
+
+ override def close(): Unit = {
+ producer.close()
+ KafkaUtil.deleteTopic(connectZk, topic)
+ }
+
+ private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, Array[Byte])] = {
+ var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, Array[Byte])]
+ while (consumer.hasNext) {
+ val kafkaMsg = consumer.next
+ kafkaMsg.key.map { k =>
+ Injection.invert[TimeStamp, Array[Byte]](k) match {
+ case Success(time) =>
+ messagesBuilder += (time -> kafkaMsg.msg)
+ case Failure(e) => throw e
+ }
+ } orElse (throw new RuntimeException("offset key should not be null"))
+ }
+ consumer.close()
+ messagesBuilder.result().toList
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
new file mode 100644
index 0000000..5f48b43
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka.dsl
+
+import java.util.Properties
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl
+import org.apache.gearpump.streaming.kafka.KafkaSink
+
+class KafkaDSLSink[T](stream: dsl.Stream[T]) {
+
+ /** Create a Kafka DSL Sink */
+ def writeToKafka(
+ topic: String,
+ bootstrapServers: String,
+ parallism: Int,
+ description: String): dsl.Stream[T] = {
+ stream.sink(new KafkaSink(topic, bootstrapServers), parallism, UserConfig.empty, description)
+ }
+
+ def writeToKafka(
+ topic: String,
+ properties: Properties,
+ parallism: Int,
+ description: String): dsl.Stream[T] = {
+ stream.sink(new KafkaSink(topic, properties), parallism, UserConfig.empty, description)
+ }
+}
+
+object KafkaDSLSink {
+
+ import scala.language.implicitConversions
+
+ implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = {
+ new KafkaDSLSink[T](stream)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
new file mode 100644
index 0000000..0275966
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka.dsl
+
+import java.util.Properties
+
+import org.apache.gearpump.streaming.dsl
+import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.kafka.KafkaSource
+import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig}
+import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
+
+object KafkaDSLUtil {
+ def createStream[T](
+ app: StreamApp,
+ parallelism: Int,
+ description: String,
+ kafkaConfig: KafkaSourceConfig,
+ offsetStorageFactory: OffsetStorageFactory,
+ messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = {
+ app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder),
+ parallelism, description)
+ }
+
+ def createStream[T](
+ app: StreamApp,
+ parallelism: Int,
+ description: String,
+ topics: String,
+ zkConnect: String,
+ offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
+ app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory),
+ parallelism, description)
+ }
+
+ def createStream[T](
+ app: StreamApp,
+ parallelism: Int,
+ description: String,
+ topics: String,
+ zkConnect: String,
+ offsetStorageFactory: OffsetStorageFactory,
+ messageDecoder: MessageDecoder,
+ timestampFilter: TimeStampFilter): dsl.Stream[T] = {
+ app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory,
+ messageDecoder, timestampFilter), parallelism, description)
+ }
+
+ def createStream[T](
+ app: StreamApp,
+ parallelism: Int,
+ description: String,
+ topics: String,
+ properties: Properties,
+ offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
+ app.source[T](new KafkaSource(topics, properties, offsetStorageFactory),
+ parallelism, description)
+ }
+
+ def createStream[T](
+ app: StreamApp,
+ topics: String,
+ parallelism: Int,
+ description: String,
+ properties: Properties,
+ offsetStorageFactory: OffsetStorageFactory,
+ messageDecoder: MessageDecoder,
+ timestampFilter: TimeStampFilter): dsl.Stream[T] = {
+ app.source[T](new KafkaSource(topics, properties, offsetStorageFactory,
+ messageDecoder, timestampFilter), parallelism, description)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
new file mode 100644
index 0000000..ea7e8d1
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import scala.util.{Failure, Success}
+
+import com.twitter.bijection.Injection
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.transaction.api.MessageDecoder
+
+class DefaultMessageDecoder extends MessageDecoder {
+ override def fromBytes(bytes: Array[Byte]): Message = {
+ Message(bytes, System.currentTimeMillis())
+ }
+}
+
+class StringMessageDecoder extends MessageDecoder {
+ override def fromBytes(bytes: Array[Byte]): Message = {
+ Injection.invert[String, Array[Byte]](bytes) match {
+ case Success(s) => Message(s, System.currentTimeMillis())
+ case Failure(e) => throw e
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
new file mode 100644
index 0000000..88f509b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import scala.util.{Failure, Success, Try}
+
+import com.twitter.bijection.Injection
+import org.slf4j.Logger
+
+import org.apache.gearpump._
+import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import org.apache.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage}
+import org.apache.gearpump.util.LogUtil
+
+object KafkaOffsetManager {
+ private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager])
+}
+
+private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager {
+ import org.apache.gearpump.streaming.kafka.lib.KafkaOffsetManager._
+
+ var maxTime: TimeStamp = 0L
+
+ override def filter(messageAndOffset: (Message, Long)): Option[Message] = {
+ val (message, offset) = messageAndOffset
+ if (message.timestamp > maxTime) {
+ maxTime = message.timestamp
+ storage.append(maxTime, Injection[Long, Array[Byte]](offset))
+ }
+ Some(message)
+ }
+
+ override def resolveOffset(time: TimeStamp): Try[Long] = {
+ storage.lookUp(time) match {
+ case Success(offset) => Injection.invert[Long, Array[Byte]](offset)
+ case Failure(Overflow(max)) =>
+ LOG.warn(s"start time larger than the max stored TimeStamp; set to max offset")
+ Injection.invert[Long, Array[Byte]](max)
+ case Failure(Underflow(min)) =>
+ LOG.warn(s"start time less than the min stored TimeStamp; set to min offset")
+ Injection.invert[Long, Array[Byte]](min)
+ case Failure(StorageEmpty) => Failure(StorageEmpty)
+ case Failure(e) => throw e
+ }
+ }
+
+ override def close(): Unit = {
+ storage.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
new file mode 100644
index 0000000..ade414e
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import java.util.Properties
+
+import kafka.api.OffsetRequest
+import kafka.consumer.ConsumerConfig
+import org.slf4j.Logger
+
+import org.apache.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper}
+import org.apache.gearpump.util.LogUtil
+
+object KafkaSourceConfig {
+
+ val NAME = "kafka_config"
+
+ val ZOOKEEPER_CONNECT = "zookeeper.connect"
+ val GROUP_ID = "group.id"
+ val CONSUMER_START_OFFSET = "kafka.consumer.start.offset"
+ val CONSUMER_TOPICS = "kafka.consumer.topics"
+ val FETCH_THRESHOLD = "kafka.consumer.fetch.threshold"
+ val FETCH_SLEEP_MS = "kafka.consumer.fetch.sleep.ms"
+ val GROUPER_CLASS = "kafka.grouper.class"
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ def apply(consumerProps: Properties): KafkaSourceConfig = new KafkaSourceConfig(consumerProps)
+}
+
+/**
+ * Extends kafka.consumer.ConsumerConfig with specific config needed by
+ * [[org.apache.gearpump.streaming.kafka.KafkaSource]]
+ *
+ * @param consumerProps kafka consumer config
+ */
+class KafkaSourceConfig(val consumerProps: Properties = new Properties)
+ extends java.io.Serializable {
+ import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig._
+
+ if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) {
+ consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181")
+ }
+
+ if (!consumerProps.containsKey(GROUP_ID)) {
+ consumerProps.setProperty(GROUP_ID, "gearpump")
+ }
+
+ def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps)
+
+ /**
+ * Set kafka consumer topics, seperated by comma.
+ *
+ * @param topics comma-separated string
+ * @return new KafkaConfig based on this but with
+ * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]]
+ * set to given value
+ */
+ def withConsumerTopics(topics: String): KafkaSourceConfig = {
+ consumerProps.setProperty(CONSUMER_TOPICS, topics)
+ KafkaSourceConfig(consumerProps)
+ }
+
+ /**
+ * Returns a list of kafka consumer topics
+ */
+ def getConsumerTopics: List[String] = {
+ Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList
+ }
+
+ /**
+ * Sets the sleep interval if there are no more message or message buffer is full.
+ *
+ * Consumer.FetchThread will sleep for a while if no more messages or
+ * the incoming queue size is above the
+ * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
+ *
+ * @param sleepMS sleep interval in milliseconds
+ * @return new KafkaConfig based on this but with
+ * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]]
+ * set to given value
+ */
+ def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = {
+ consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS")
+ KafkaSourceConfig(consumerProps)
+ }
+
+ /**
+ * Gets the sleep interval
+ *
+ * Consumer.FetchThread sleeps for a while if no more messages or
+ * the incoming queue is full (size is bigger than the
+ * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]])
+ *
+ * @return sleep interval in milliseconds
+ */
+ def getFetchSleepMS: Int = {
+ Option(consumerProps.getProperty(FETCH_SLEEP_MS)).getOrElse("100").toInt
+ }
+
+ /**
+ * Sets the batch size we use for one fetch.
+ *
+ * Consumer.FetchThread stops fetching new messages if its incoming queue
+ * size is above the threshold and starts again when the queue size is below it
+ *
+ * @param threshold queue size
+ * @return new KafkaConfig based on this but with
+ * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
+ * set to give value
+ */
+ def withFetchThreshold(threshold: Int): KafkaSourceConfig = {
+ consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold")
+ KafkaSourceConfig(consumerProps)
+ }
+
+ /**
+ * Returns fetch batch size.
+ *
+ * Consumer.FetchThread stops fetching new messages if
+ * its incoming queue size is above the threshold and starts again when the queue size is below it
+ *
+ * @return fetch threshold
+ */
+ def getFetchThreshold: Int = {
+ Option(consumerProps.getProperty(FETCH_THRESHOLD)).getOrElse("10000").toInt
+ }
+
+ /**
+ * Sets [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which
+ * defines how kafka.common.TopicAndPartitions are mapped to source tasks.
+ *
+ * @param className name of the factory class
+ * @return new KafkaConfig based on this but with
+ * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]]
+ * set to given value
+ */
+ def withGrouper(className: String): KafkaSourceConfig = {
+ consumerProps.setProperty(GROUPER_CLASS, className)
+ KafkaSourceConfig(consumerProps)
+ }
+
+ /**
+ * Returns [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which
+ * defines how kafka.common.TopicAndPartitions are mapped to source tasks
+ */
+ def getGrouper: KafkaGrouper = {
+ Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS))
+ .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper]
+ }
+
+ def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = {
+ consumerProps.setProperty(CONSUMER_START_OFFSET, s"$earliestOrLatest")
+ KafkaSourceConfig(consumerProps)
+ }
+
+ def getConsumerStartOffset: Long = {
+ Option(consumerProps.getProperty(CONSUMER_START_OFFSET))
+ .getOrElse(s"${OffsetRequest.EarliestTime}").toLong
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
new file mode 100644
index 0000000..e8cf574
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib
+
+import java.io.InputStream
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.consumer.ConsumerConfig
+import kafka.utils.{ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
+import org.apache.kafka.common.serialization.Serializer
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+object KafkaUtil {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ def getBroker(connectZk: => ZkClient, topic: String, partition: Int): Broker = {
+ val zkClient = connectZk
+ try {
+ val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+ .getOrElse(throw new RuntimeException(
+ s"leader not available for TopicAndPartition($topic, $partition)"))
+ ZkUtils.getBrokerInfo(zkClient, leader)
+ .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader"))
+ } catch {
+ case e: Exception =>
+ LOG.error(e.getMessage)
+ throw e
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String])
+ : Array[TopicAndPartition] = {
+ val zkClient = connectZk
+ try {
+ ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
+ case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
+ }.toArray
+ } catch {
+ case e: Exception =>
+ LOG.error(e.getMessage)
+ throw e
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ def topicExists(connectZk: => ZkClient, topic: String): Boolean = {
+ val zkClient = connectZk
+ try {
+ AdminUtils.topicExists(zkClient, topic)
+ } catch {
+ case e: Exception =>
+ LOG.error(e.getMessage)
+ throw e
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ /**
+ * create a new kafka topic
+ * return true if topic already exists, and false otherwise
+ */
+ def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int)
+ : Boolean = {
+ val zkClient = connectZk
+ try {
+ if (AdminUtils.topicExists(zkClient, topic)) {
+ LOG.info(s"topic $topic exists")
+ true
+ } else {
+ AdminUtils.createTopic(zkClient, topic, partitions, replicas)
+ LOG.info(s"created topic $topic")
+ false
+ }
+ } catch {
+ case e: Exception =>
+ LOG.error(e.getMessage)
+ throw e
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ def deleteTopic(connectZk: => ZkClient, topic: String): Unit = {
+ val zkClient = connectZk
+ try {
+ AdminUtils.deleteTopic(zkClient, topic)
+ } catch {
+ case e: Exception =>
+ LOG.error(e.getMessage)
+ } finally {
+ zkClient.close()
+ }
+ }
+
+ def connectZookeeper(config: ConsumerConfig): () => ZkClient = {
+ val zookeeperConnect = config.zkConnect
+ val sessionTimeout = config.zkSessionTimeoutMs
+ val connectionTimeout = config.zkConnectionTimeoutMs
+ () => new ZkClient(zookeeperConnect, sessionTimeout, connectionTimeout, ZKStringSerializer)
+ }
+
+ def loadProperties(filename: String): Properties = {
+ val props = new Properties()
+ var propStream: InputStream = null
+ try {
+ propStream = getClass.getClassLoader.getResourceAsStream(filename)
+ props.load(propStream)
+ } catch {
+ case e: Exception =>
+ LOG.error(s"$filename not found")
+ } finally {
+ if (propStream != null) {
+ propStream.close()
+ }
+ }
+ props
+ }
+
+ def createKafkaProducer[K, V](properties: Properties,
+ keySerializer: Serializer[K],
+ valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
+ if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
+ }
+ new KafkaProducer[K, V](properties, keySerializer, valueSerializer)
+ }
+
+ def buildProducerConfig(bootstrapServers: String): Properties = {
+ val properties = new Properties()
+ properties.setProperty("bootstrap.servers", bootstrapServers)
+ properties
+ }
+
+ def buildConsumerConfig(zkConnect: String): Properties = {
+ val properties = new Properties()
+ properties.setProperty("zookeeper.connect", zkConnect)
+ properties.setProperty("group.id", "gearpump")
+ properties
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
new file mode 100644
index 0000000..ce17f5a
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+/**
+ * someone sleeps for exponentially increasing duration each time
+ * until the cap
+ *
+ * @param backOffMultiplier The factor by which the duration increases.
+ * @param initialDurationMs Time in milliseconds for initial sleep.
+ * @param maximumDurationMs Cap up to which we will increase the duration.
+ */
+private[consumer] class ExponentialBackoffSleeper(
+ backOffMultiplier: Double = 2.0,
+ initialDurationMs: Long = 100,
+ maximumDurationMs: Long = 10000) {
+
+ require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
+ require(initialDurationMs > 0, "initialDurationMs must be positive")
+ require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs")
+
+ private var sleepDuration = initialDurationMs
+
+ def reset(): Unit = {
+ sleepDuration = initialDurationMs
+ }
+
+ def sleep(): Unit = {
+ Thread.sleep(sleepDuration)
+ setNextSleepDuration()
+ }
+
+ def getSleepDuration: Long = sleepDuration
+
+ def setNextSleepDuration(): Unit = {
+ val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long]
+ sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
new file mode 100644
index 0000000..8550207
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.LinkedBlockingQueue
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.ConsumerConfig
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+object FetchThread {
+ private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
+
+ def apply(topicAndPartitions: Array[TopicAndPartition],
+ fetchThreshold: Int,
+ fetchSleepMS: Long,
+ startOffsetTime: Long,
+ consumerConfig: ConsumerConfig): FetchThread = {
+ val createConsumer = (tp: TopicAndPartition) =>
+ KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
+
+ val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+ new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS)
+ }
+}
+
+/**
+ * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them
+ * onto a queue, which is asynchronously polled by a consumer
+ *
+ * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
+ * [[org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to
+ * connect to it
+ * @param incomingQueue a queue to buffer incoming messages
+ * @param fetchThreshold above which thread should stop fetching messages
+ * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold
+ */
+private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
+ createConsumer: TopicAndPartition => KafkaConsumer,
+ incomingQueue: LinkedBlockingQueue[KafkaMessage],
+ fetchThreshold: Int,
+ fetchSleepMS: Long) extends Thread {
+ import org.apache.gearpump.streaming.kafka.lib.consumer.FetchThread._
+
+ private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers
+
+ def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
+ consumers(tp).setStartOffset(startOffset)
+ }
+
+ def poll: Option[KafkaMessage] = {
+ Option(incomingQueue.poll())
+ }
+
+ override def run(): Unit = {
+ try {
+ var nextOffsets = Map.empty[TopicAndPartition, Long]
+ var reset = false
+ val sleeper = new ExponentialBackoffSleeper(
+ backOffMultiplier = 2.0,
+ initialDurationMs = 100L,
+ maximumDurationMs = 10000L)
+ while (!Thread.currentThread().isInterrupted) {
+ try {
+ if (reset) {
+ nextOffsets = consumers.mapValues(_.getNextOffset)
+ resetConsumers(nextOffsets)
+ reset = false
+ }
+ val hasMoreMessages = fetchMessage
+ sleeper.reset()
+ if (!hasMoreMessages) {
+ Thread.sleep(fetchSleepMS)
+ }
+ } catch {
+ case exception: Exception =>
+ LOG.warn(s"resetting consumers due to $exception")
+ reset = true
+ sleeper.sleep()
+ }
+ }
+ } catch {
+ case e: InterruptedException => LOG.info("fetch thread got interrupted exception")
+ case e: ClosedByInterruptException => LOG.info("fetch thread closed by interrupt exception")
+ } finally {
+ consumers.values.foreach(_.close())
+ }
+ }
+
+ /**
+ * fetch message from each TopicAndPartition in a round-robin way
+ */
+ def fetchMessage: Boolean = {
+ consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
+ val (_, consumer) = tpAndConsumer
+ if (incomingQueue.size < fetchThreshold) {
+ if (consumer.hasNext) {
+ incomingQueue.put(consumer.next())
+ true
+ } else {
+ hasNext
+ }
+ } else {
+ true
+ }
+ }
+ }
+
+ private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
+ topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
+ }
+
+ private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = {
+ consumers.values.foreach(_.close())
+ consumers = createAllConsumers
+ consumers.foreach { case (tp, consumer) =>
+ consumer.setStartOffset(nextOffsets(tp))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
new file mode 100644
index 0000000..55c327b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.common.ErrorMapping._
+import kafka.common.TopicAndPartition
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.MessageAndOffset
+import kafka.utils.Utils
+
+import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
+
+object KafkaConsumer {
+ def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig)
+ : KafkaConsumer = {
+ val connectZk = KafkaUtil.connectZookeeper(config)
+ val broker = KafkaUtil.getBroker(connectZk(), topic, partition)
+ val soTimeout = config.socketTimeoutMs
+ val soBufferSize = config.socketReceiveBufferBytes
+ val fetchSize = config.fetchMessageMaxBytes
+ val clientId = config.clientId
+ val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, soBufferSize, clientId)
+ val getIterator = (offset: Long) => {
+ val request = new FetchRequestBuilder()
+ .addFetch(topic, partition, offset, fetchSize)
+ .build()
+
+ val response = consumer.fetch(request)
+ response.errorCode(topic, partition) match {
+ case NoError => response.messageSet(topic, partition).iterator
+ case error => throw exceptionFor(error)
+ }
+ }
+ new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime)
+ }
+}
+
+/**
+ * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over
+ * messages from a kafka kafka.common.TopicAndPartition.
+ */
+class KafkaConsumer(consumer: SimpleConsumer,
+ topic: String,
+ partition: Int,
+ getIterator: (Long) => Iterator[MessageAndOffset],
+ startOffsetTime: Long = OffsetRequest.EarliestTime) {
+ private val earliestOffset = consumer
+ .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1)
+ private var nextOffset: Long = earliestOffset
+ private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset)
+
+ def setStartOffset(startOffset: Long): Unit = {
+ nextOffset = startOffset
+ iterator = getIterator(nextOffset)
+ }
+
+ def next(): KafkaMessage = {
+ val mo = iterator.next()
+ val message = mo.message
+
+ nextOffset = mo.nextOffset
+
+ val offset = mo.offset
+ val payload = Utils.readBytes(message.payload)
+ new KafkaMessage(topic, partition, offset, Option(message.key).map(Utils.readBytes), payload)
+ }
+
+ def hasNext: Boolean = {
+ @annotation.tailrec
+ def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): Boolean = {
+ if (iter.hasNext) true
+ else if (newIterator) false
+ else {
+ iterator = getIterator(nextOffset)
+ hasNextHelper(iterator, newIterator = true)
+ }
+ }
+ hasNextHelper(iterator, newIterator = false)
+ }
+
+ def getNextOffset: Long = nextOffset
+
+ def close(): Unit = {
+ consumer.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
new file mode 100644
index 0000000..e0813d9
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.consumer
+
+import kafka.common.TopicAndPartition
+
+/**
+ * wrapper over messages from kafka
+ * @param topicAndPartition where message comes from
+ * @param offset message offset on kafka queue
+ * @param key message key, could be None
+ * @param msg message payload
+ */
+case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long,
+ key: Option[Array[Byte]], msg: Array[Byte]) {
+
+ def this(topic: String, partition: Int, offset: Long,
+ key: Option[Array[Byte]], msg: Array[Byte]) = {
+ this(TopicAndPartition(topic, partition), offset, key, msg)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
new file mode 100644
index 0000000..b34bf09
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.grouper
+
+import kafka.common.TopicAndPartition
+
+/**
+ * default grouper groups TopicAndPartitions among StreamProducers by partitions
+ *
+ * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) and
+ * 2 streamProducers (streamProducer0 and streamProducer1)
+ *
+ * streamProducer0 gets (topicA, partition1), (topicB, partition1) and (topicA, partition3)
+ * streamProducer1 gets (topicA, partition2), (topicB, partition2)
+ */
+class KafkaDefaultGrouper extends KafkaGrouper {
+ def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+ : Array[TopicAndPartition] = {
+ topicAndPartitions.indices.filter(_ % taskNum == taskIndex)
+ .map(i => topicAndPartitions(i)).toArray
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
new file mode 100644
index 0000000..e2f5203
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.grouper
+
+import kafka.common.TopicAndPartition
+
+/**
+ * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks
+ */
+trait KafkaGrouper {
+ def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+ : Array[TopicAndPartition]
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
deleted file mode 100644
index 2b00414..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka
-
-import com.twitter.bijection.Injection
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.streaming.MockUtil
-
-class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
- val dataGen = for {
- topic <- Gen.alphaStr
- key <- Gen.alphaStr
- msg <- Gen.alphaStr
- } yield (topic, Injection[String, Array[Byte]](key), Injection[String, Array[Byte]](msg))
-
- property("KafkaSink write should send producer record") {
- forAll(dataGen) {
- (data: (String, Array[Byte], Array[Byte])) =>
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val (topic, key, msg) = data
- val kafkaSink = new KafkaSink(() => producer, topic)
- kafkaSink.write(Message((key, msg)))
- verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](
- r => r.topic == topic && (r.key sameElements key) && (r.value sameElements msg)))
- kafkaSink.write(Message(msg))
- verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](
- r => r.topic() == topic && (r.key == null) && (r.value() sameElements msg)
- ))
- kafkaSink.close()
- }
- }
-
- property("KafkaSink close should close kafka producer") {
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val kafkaSink = new KafkaSink(() => producer, "topic")
- kafkaSink.close()
- verify(producer).close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
deleted file mode 100644
index 7c804f7..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka
-
-import scala.util.{Failure, Success}
-
-import com.twitter.bijection.Injection
-import kafka.common.TopicAndPartition
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
-import io.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig}
-import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
-import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
-
-class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
- val startTimeGen = Gen.choose[Long](0L, 1000L)
- val offsetGen = Gen.choose[Long](0L, 1000L)
-
- property("KafkaSource open sets consumer to earliest offset") {
- val topicAndPartition = mock[TopicAndPartition]
- val fetchThread = mock[FetchThread]
- val offsetManager = mock[KafkaOffsetManager]
- val messageDecoder = mock[MessageDecoder]
- val timestampFilter = mock[TimeStampFilter]
- val offsetStorageFactory = mock[OffsetStorageFactory]
- val kafkaConfig = mock[KafkaSourceConfig]
- val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
- timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
-
- kafkaSource.setStartTime(None)
-
- verify(fetchThread).start()
- verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong())
- }
-
- property("KafkaSource open should not set consumer start offset if offset storage is empty") {
- forAll(startTimeGen) { (startTime: Long) =>
- val offsetManager = mock[KafkaOffsetManager]
- val topicAndPartition = mock[TopicAndPartition]
- val fetchThread = mock[FetchThread]
- val messageDecoder = mock[MessageDecoder]
- val timestampFilter = mock[TimeStampFilter]
- val offsetStorageFactory = mock[OffsetStorageFactory]
- val kafkaConfig = mock[KafkaSourceConfig]
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
- timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
-
- when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty))
-
- source.setStartTime(Some(startTime))
- verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong())
- verify(fetchThread).start()
-
- when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException))
- intercept[RuntimeException] {
- source.setStartTime(Some(startTime))
- }
- source.close()
- }
- }
-
- property("KafkaSource open should set consumer start offset if offset storage is not empty") {
- forAll(startTimeGen, offsetGen) {
- (startTime: Long, offset: Long) =>
- val offsetManager = mock[KafkaOffsetManager]
- val topicAndPartition = mock[TopicAndPartition]
- val fetchThread = mock[FetchThread]
- val messageDecoder = mock[MessageDecoder]
- val timestampFilter = mock[TimeStampFilter]
- val offsetStorageFactory = mock[OffsetStorageFactory]
- val kafkaConfig = mock[KafkaSourceConfig]
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
- timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
-
- when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset))
-
- source.setStartTime(Some(startTime))
- verify(fetchThread).setStartOffset(topicAndPartition, offset)
- verify(fetchThread).start()
-
- when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException))
- intercept[RuntimeException] {
- source.setStartTime(Some(startTime))
- }
- source.close()
- }
- }
-
- property("KafkaSource read should return number of messages in best effort") {
- val kafkaMsgGen = for {
- topic <- Gen.alphaStr
- partition <- Gen.choose[Int](0, 1000)
- offset <- Gen.choose[Long](0L, 1000L)
- key = None
- msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
- } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg)
- val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen)
- forAll(msgQueueGen) {
- (msgQueue: Array[KafkaMessage]) =>
- val offsetManager = mock[KafkaOffsetManager]
- val fetchThread = mock[FetchThread]
- val messageDecoder = mock[MessageDecoder]
-
- val timestampFilter = mock[TimeStampFilter]
- val offsetStorageFactory = mock[OffsetStorageFactory]
- val kafkaConfig = mock[KafkaSourceConfig]
- val offsetManagers = msgQueue.map(_.topicAndPartition -> offsetManager).toMap
-
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
- timestampFilter, Some(fetchThread), offsetManagers)
-
- if (msgQueue.isEmpty) {
- when(fetchThread.poll).thenReturn(None)
- source.read() shouldBe null
- } else {
- msgQueue.indices.foreach { i =>
- val message = Message(msgQueue(i).msg)
- when(fetchThread.poll).thenReturn(Option(msgQueue(i)))
- when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message)
- when(offsetManager.filter(anyObject[(Message, Long)])).thenReturn(Some(message))
- when(timestampFilter.filter(anyObject[Message], anyLong())).thenReturn(Some(message))
-
- source.read shouldBe message
- }
- }
- source.close()
- }
- }
-
- property("KafkaSource close should close all offset managers") {
- val offsetManager = mock[KafkaOffsetManager]
- val topicAndPartition = mock[TopicAndPartition]
- val fetchThread = mock[FetchThread]
- val timestampFilter = mock[TimeStampFilter]
- val messageDecoder = mock[MessageDecoder]
- val offsetStorageFactory = mock[OffsetStorageFactory]
- val kafkaConfig = mock[KafkaSourceConfig]
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
- timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
- source.close()
- verify(offsetManager).close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
deleted file mode 100644
index f3b5425..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import com.twitter.bijection.Injection
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
- property("DefaultMessageDecoder should keep the original bytes data in Message") {
- val decoder = new DefaultMessageDecoder()
- forAll(Gen.alphaStr) { (s: String) =>
- val bytes = Injection[String, Array[Byte]](s)
- decoder.fromBytes(bytes).msg shouldBe bytes
- }
- }
-}
-
-class StringMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
- property("StringMessageDecoder should decode original bytes data into string") {
- val decoder = new StringMessageDecoder()
- forAll(Gen.alphaStr) { (s: String) =>
- val bytes = Injection[String, Array[Byte]](s)
- decoder.fromBytes(bytes).msg shouldBe s
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
deleted file mode 100644
index c762b06..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import scala.util.{Failure, Success}
-
-import com.twitter.bijection.Injection
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.streaming.transaction.api.OffsetStorage
-import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
-
-class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
- val timeStampGen = Gen.choose[Long](0L, 1000L)
- val messageGen = for {
- msg <- Gen.alphaStr
- time <- timeStampGen
- } yield Message(msg, time)
-
- val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex)
-
- property("KafkaOffsetManager should append offset to storage in monotonically" +
- " increasing time order") {
- forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) =>
- val offsetStorage = mock[OffsetStorage]
- val offsetManager = new KafkaOffsetManager(offsetStorage)
- messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) =>
- val (message, offset) = messageAndOffset
- offsetManager.filter((message, offset.toLong)) shouldBe Option(message)
- if (message.timestamp > max) {
- val newMax = message.timestamp
- verify(offsetStorage).append(newMax, Injection[Long, Array[Byte]](offset.toLong))
- newMax
- } else {
- verifyZeroInteractions(offsetStorage)
- max
- }
- }
- offsetManager.close()
- }
- }
-
- val minTimeStampGen = Gen.choose[Long](0L, 500L)
- val maxTimeStampGen = Gen.choose[Long](500L, 1000L)
- property("KafkaOffsetManager resolveOffset should " +
- "report StorageEmpty failure when storage is empty") {
- forAll(timeStampGen) { (time: Long) =>
- val offsetStorage = mock[OffsetStorage]
- val offsetManager = new KafkaOffsetManager(offsetStorage)
- when(offsetStorage.lookUp(time)).thenReturn(Failure(StorageEmpty))
- offsetManager.resolveOffset(time) shouldBe Failure(StorageEmpty)
-
- doThrow(new RuntimeException).when(offsetStorage).lookUp(time)
- intercept[RuntimeException] {
- offsetManager.resolveOffset(time)
- }
- offsetManager.close()
- }
- }
-
- val offsetGen = Gen.choose[Long](0L, 1000L)
- property("KafkaOffsetManager resolveOffset should return a valid" +
- " offset when storage is not empty") {
- forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) {
- (time: Long, min: Long, max: Long, offset: Long) =>
- val offsetStorage = mock[OffsetStorage]
- val offsetManager = new KafkaOffsetManager(offsetStorage)
- if (time < min) {
- when(offsetStorage.lookUp(time)).thenReturn(Failure(
- Underflow(Injection[Long, Array[Byte]](min))))
- offsetManager.resolveOffset(time) shouldBe Success(min)
- } else if (time > max) {
- when(offsetStorage.lookUp(time)).thenReturn(Failure(
- Overflow(Injection[Long, Array[Byte]](max))))
- offsetManager.resolveOffset(time) shouldBe Success(max)
- } else {
- when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset)))
- offsetManager.resolveOffset(time) shouldBe Success(offset)
- }
-
- doThrow(new RuntimeException).when(offsetStorage).lookUp(time)
- intercept[RuntimeException] {
- offsetManager.resolveOffset(time)
- }
- offsetManager.close()
- }
- }
-
- property("KafkaOffsetManager close should close offset storage") {
- val offsetStorage = mock[OffsetStorage]
- val offsetManager = new KafkaOffsetManager(offsetStorage)
- offsetManager.close()
- verify(offsetStorage).close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
deleted file mode 100644
index af23c12..0000000
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-// TODO: Fix the UT failure!
-
-// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-// val minTimeGen = Gen.choose[Long](1L, 500L)
-// val maxTimeGen = Gen.choose[Long](500L, 999L)
-//
-// property("KafkaStorage lookup time should report StorageEmpty if storage is empty") {
-// forAll { (time: Long, topic: String) =>
-// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-// val getConsumer = () => mock[KafkaConsumer]
-// val connectZk = () => mock[ZkClient]
-// val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(),
-// connectZk())
-// storage.lookUp(time) shouldBe Failure(StorageEmpty)
-// }
-// }
-//
-// property("KafkaStorage lookup time should return data or report failure if storage not empty") {
-// forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) =>
-// val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) =>
-// val offset = index.toLong
-// time -> offset
-// }
-// val timeAndOffsetsMap = timeAndOffsets.toMap
-// val data = timeAndOffsets.map {
-// case (time, offset) =>
-// new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)),
-// Injection[Long, Array[Byte]](offset))
-// }.toList
-//
-// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-// val consumer = mock[KafkaConsumer]
-// val getConsumer = () => consumer
-// val connectZk = () => mock[ZkClient]
-//
-// val hasNexts = List.fill(data.tail.size)(true) :+ false
-// when(consumer.hasNext).thenReturn(true, hasNexts:_*)
-// when(consumer.next).thenReturn(data.head, data.tail:_*)
-//
-// val storage = new KafkaStorage(topic, topicExists = true, producer,
-// getConsumer(), connectZk())
-// forAll(Gen.choose[Long](minTime, maxTime)) {
-// time =>
-// storage.lookUp(time) match {
-// case Success(array) =>
-// array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time)))
-// case Failure(e) => fail("time in range should return Success with value")
-// }
-// }
-//
-// forAll(Gen.choose[Long](0L, minTime - 1)) {
-// time =>
-// storage.lookUp(time) match {
-// case Failure(e) => e shouldBe a [Underflow]
-// e.asInstanceOf[Underflow].min should equal
-// (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime)))
-// case Success(_) => fail("time less than min should return Underflow failure")
-// }
-// }
-//
-// forAll(Gen.choose[Long](maxTime + 1, 1000L)) {
-// time =>
-// storage.lookUp(time) match {
-// case Failure(e) => e shouldBe a [Overflow]
-// e.asInstanceOf[Overflow].max should equal
-// (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime)))
-// case Success(_) => fail("time larger than max should return Overflow failure")
-// }
-// }
-// }
-// }
-//
-// property("KafkaStorage append should send data to Kafka") {
-// forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000),
-// Gen.alphaStr, Gen.oneOf(true, false)) {
-// (time: Long, offset: Long, topic: String, topicExists: Boolean) =>
-// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-// val getConsumer = () => mock[KafkaConsumer]
-// val connectZk = () => mock[ZkClient]
-// val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk())
-// val offsetBytes = Injection[Long, Array[Byte]](offset)
-// storage.append(time, offsetBytes)
-// verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]())
-// }
-// }
-//
-// val topicAndPartitionGen = for {
-// topic <- Gen.alphaStr
-// partition <- Gen.choose[Int](0, 100)
-// } yield TopicAndPartition(topic, partition)
-// property("KafkaStorage should load data from Kafka") {
-// val kafkaMsgGen = for {
-// timestamp <- Gen.choose[Long](1L, 1000L)
-// offset <- Gen.choose[Long](0L, 1000L)
-// } yield (timestamp, Injection[Long, Array[Byte]](offset))
-// val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen)
-//
-// val topicExistsGen = Gen.oneOf(true, false)
-//
-// forAll(topicAndPartitionGen, msgListGen) {
-// (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) =>
-// val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]]
-// val consumer = mock[KafkaConsumer]
-// val getConsumer = () => consumer
-// val connectZk = () => mock[ZkClient]
-// val kafkaStorage = new KafkaStorage(topicAndPartition.topic,
-// topicExists = true, producer, getConsumer(), connectZk())
-// msgList match {
-// case Nil =>
-// when(consumer.hasNext).thenReturn(false)
-// case list =>
-// val hasNexts = List.fill(list.tail.size)(true) :+ false
-// val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) =>
-// KafkaMessage(topicAndPartition, index.toLong,
-// Some(Injection[Long, Array[Byte]](timestamp)), bytes)
-// }
-// when(consumer.hasNext).thenReturn(true, hasNexts: _*)
-// when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*)
-// }
-// kafkaStorage.load(consumer) shouldBe msgList
-// }
-// }
-//
-// property("KafkaStorage should not get consumer when topic doesn't exist") {
-// forAll(Gen.alphaStr) { (topic: String) =>
-// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-// val getConsumer = mock[() => KafkaConsumer]
-// val connectZk = () => mock[ZkClient]
-// val kafkaStorage = new KafkaStorage(topic,
-// topicExists = false, producer, getConsumer(), connectZk())
-// verify(getConsumer, never()).apply()
-// kafkaStorage.close()
-// }
-// }
-//
-// property("KafkaStorage should fail to load invalid KafkaMessage") {
-// val invalidKafkaMsgGen = for {
-// tp <- topicAndPartitionGen
-// offset <- Gen.choose[Long](1L, 1000L)
-// timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))),
-// None)
-// msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
-// } yield KafkaMessage(tp, offset, timestamp, msg)
-// forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) =>
-// val consumer = mock[KafkaConsumer]
-// val getConsumer = () => consumer
-// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-// val connectZk = () => mock[ZkClient]
-// val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic,
-// topicExists = true, producer, getConsumer(), connectZk())
-// when(consumer.hasNext).thenReturn(true, false)
-// when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg)
-// Try(kafkaStorage.load(consumer)).isFailure shouldBe true
-// }
-// }
-//
-// property("KafkaStorage close should close kafka producer and delete topic") {
-// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-// val getConsumer = () => mock[KafkaConsumer]
-// val zkClient = mock[ZkClient]
-// val connectZk = () => zkClient
-// val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk())
-// kafkaStorage.close()
-// verify(producer).close()
-// verify(zkClient).createPersistent(anyString(), anyString())
-// }
-// }