You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:43 UTC
[33/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
index 968834e..3dede8e 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,21 +19,20 @@
package io.gearpump.streaming.kafka
import java.util.Properties
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import kafka.common.TopicAndPartition
+import org.slf4j.Logger
-import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig, KafkaUtil, KafkaOffsetManager}
import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
+import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil}
import io.gearpump.streaming.source.DefaultTimeStampFilter
import io.gearpump.streaming.task.TaskContext
+import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
import io.gearpump.streaming.transaction.api._
-import kafka.common.TopicAndPartition
-import OffsetStorage.StorageEmpty
import io.gearpump.util.LogUtil
import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Success}
-
object KafkaSource {
private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
@@ -44,15 +43,19 @@ object KafkaSource {
* from multiple Kafka TopicAndPartition in a round-robin way.
*
* This is a TimeReplayableSource which is able to replay messages given a start time.
- * Each kafka message is tagged with a timestamp by [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) mapping
- * is stored to a [[OffsetStorage]]. On recovery, we could retrieve the previously stored offset
- * from the [[OffsetStorage]] by timestamp and start to read from there.
+ * Each kafka message is tagged with a timestamp by
+ * [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) mapping
+ * is stored to a [[io.gearpump.streaming.transaction.api.OffsetStorage]]. On recovery,
+ * we could retrieve the previously stored offset from the
+ * [[io.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read from
+ * there.
*
- * kafka message is wrapped into gearpump [[Message]] and further filtered by a [[TimeStampFilter]]
+ * kafka message is wrapped into gearpump [[io.gearpump.Message]] and further filtered by a
+ * [[io.gearpump.streaming.transaction.api.TimeStampFilter]]
* such that obsolete messages are dropped.
*
* @param config kafka source config
- * @param messageDecoder decodes [[Message]] from raw bytes
+ * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
* @param timestampFilter filters out message based on timestamp
* @param fetchThread fetches messages and puts on a in-memory queue
* @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition
@@ -63,59 +66,66 @@ class KafkaSource(
messageDecoder: MessageDecoder = new DefaultMessageDecoder,
timestampFilter: TimeStampFilter = new DefaultTimeStampFilter,
private var fetchThread: Option[FetchThread] = None,
- private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = Map.empty[TopicAndPartition, KafkaOffsetManager])
- extends TimeReplayableSource {
- import KafkaSource._
+ private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = {
+ Map.empty[TopicAndPartition, KafkaOffsetManager]
+ }) extends TimeReplayableSource {
+ import io.gearpump.streaming.kafka.KafkaSource._
private var startTime: Option[TimeStamp] = None
-
/**
+ * Constructs a Kafka Source by...
+ *
* @param topics comma-separated string of topics
* @param properties kafka consumer config
* @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
- * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+ * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
*
*/
def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = {
this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory)
}
/**
+ * Constructs a Kafka Source by...
+ *
* @param topics comma-separated string of topics
* @param properties kafka consumer config
* @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
- * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
- * @param messageDecoder decodes [[Message]] from raw bytes
+ * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+ * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
* @param timestampFilter filters out message based on timestamp
*/
def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory,
- messageDecoder: MessageDecoder,
- timestampFilter: TimeStampFilter) = {
+ messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = {
this(KafkaSourceConfig(properties)
.withConsumerTopics(topics), offsetStorageFactory,
messageDecoder, timestampFilter)
}
/**
+ * Constructs a Kafka Source by...
+ *
* @param topics comma-separated string of topics
* @param zkConnect kafka consumer config `zookeeper.connect`
* @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
- * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+ * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
*/
def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) =
this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory)
/**
+ * Constructs a Kafka Source by...
+ *
* @param topics comma-separated string of topics
* @param zkConnect kafka consumer config `zookeeper.connect`
* @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
- * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
- * @param messageDecoder decodes [[Message]] from raw bytes
+ * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+ * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
* @param timestampFilter filters out message based on timestamp
*/
def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory,
- messageDecoder: MessageDecoder,
- timestampFilter: TimeStampFilter) = {
+ messageDecoder: MessageDecoder,
+ timestampFilter: TimeStampFilter) = {
this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory,
messageDecoder, timestampFilter)
}
@@ -191,5 +201,4 @@ class KafkaSource(
override def close(): Unit = {
offsetManagers.foreach(_._2.close())
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
index eacd267..e50bf84 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,36 +19,37 @@
package io.gearpump.streaming.kafka
import java.util.Properties
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
import com.twitter.bijection.Injection
-import io.gearpump.streaming.kafka.lib.KafkaUtil
-import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, OffsetStorage}
import kafka.api.OffsetRequest
import kafka.consumer.ConsumerConfig
import org.I0Itec.zkclient.ZkClient
-import io.gearpump.TimeStamp
-import OffsetStorage.{Overflow, StorageEmpty, Underflow}
-import io.gearpump.util.LogUtil
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.slf4j.Logger
-import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
+import io.gearpump.TimeStamp
+import io.gearpump.streaming.kafka.lib.KafkaUtil
+import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
+import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import io.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory}
+import io.gearpump.util.LogUtil
/**
- * factory that builds [[KafkaStorage]]
+ * Factory that builds [[KafkaStorage]]
*
* @param consumerProps kafka consumer config
* @param producerProps kafka producer config
*/
-class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties) extends OffsetStorageFactory {
+class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties)
+ extends OffsetStorageFactory {
/**
- *
- * this creates consumer config properties with `zookeeper.connect` set to zkConnect
+ * Creates consumer config properties with `zookeeper.connect` set to zkConnect
* and producer config properties with `bootstrap.servers` set to bootstrapServers
+ *
* @param zkConnect kafka consumer config `zookeeper.connect`
* @param bootstrapServers kafka producer config `bootstrap.servers`
*/
@@ -70,7 +71,8 @@ object KafkaStorage {
}
/**
- * this stores offset-timestamp mapping to kafka
+ * Stores offset-timestamp mapping to kafka
+ *
* @param topic kafka store topic
* @param producer kafka producer
* @param getConsumer function to get kafka consumer
@@ -83,11 +85,10 @@ class KafkaStorage private[kafka](
connectZk: => ZkClient)
extends OffsetStorage {
-
private lazy val consumer = getConsumer
private val dataByTime: List[(TimeStamp, Array[Byte])] = {
- if (KafkaUtil.topicExists(connectZk, topic)){
+ if (KafkaUtil.topicExists(connectZk, topic)) {
load(consumer)
} else {
List.empty[(TimeStamp, Array[Byte])]
@@ -95,8 +96,9 @@ class KafkaStorage private[kafka](
}
/**
- * offsets with timestamp < `time` have already been processed by the system
- * so we look up the storage for the first offset with timestamp >= `time` on replay
+ * Offsets with timestamp less than `time` have already been processed by the system
+ * so we look up the storage for the first offset with timestamp large equal than `time`
+ * on replay.
*
* @param time the timestamp to look up for the earliest unprocessed offset
* @return the earliest unprocessed offset if `time` is in the range, otherwise failure
@@ -143,5 +145,4 @@ class KafkaStorage private[kafka](
consumer.close()
messagesBuilder.result().toList
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
index 5c700be..2a852e2 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,6 +24,8 @@ import io.gearpump.streaming.dsl
import io.gearpump.streaming.kafka.KafkaSink
class KafkaDSLSink[T](stream: dsl.Stream[T]) {
+
+ /** Create a Kafka DSL Sink */
def writeToKafka(
topic: String,
bootstrapServers: String,
@@ -42,6 +44,9 @@ class KafkaDSLSink[T](stream: dsl.Stream[T]) {
}
object KafkaDSLSink {
+
+ import scala.language.implicitConversions
+
implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = {
new KafkaDSLSink[T](stream)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
index 7f752e5..325b40f 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,10 +20,10 @@ package io.gearpump.streaming.kafka.dsl
import java.util.Properties
import io.gearpump.streaming.dsl
-import io.gearpump.streaming.dsl.{StreamApp}
+import io.gearpump.streaming.dsl.StreamApp
import io.gearpump.streaming.kafka.KafkaSource
import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig}
-import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, TimeStampFilter, MessageDecoder}
+import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
object KafkaDSLUtil {
def createStream[T](
@@ -33,7 +33,8 @@ object KafkaDSLUtil {
kafkaConfig: KafkaSourceConfig,
offsetStorageFactory: OffsetStorageFactory,
messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = {
- app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder), parallelism, description)
+ app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder),
+ parallelism, description)
}
def createStream[T](
@@ -43,8 +44,8 @@ object KafkaDSLUtil {
topics: String,
zkConnect: String,
offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
- app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory)
- , parallelism, description)
+ app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory),
+ parallelism, description)
}
def createStream[T](
@@ -56,8 +57,8 @@ object KafkaDSLUtil {
offsetStorageFactory: OffsetStorageFactory,
messageDecoder: MessageDecoder,
timestampFilter: TimeStampFilter): dsl.Stream[T] = {
- app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory, messageDecoder, timestampFilter)
- , parallelism, description)
+ app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory,
+ messageDecoder, timestampFilter), parallelism, description)
}
def createStream[T](
@@ -67,7 +68,8 @@ object KafkaDSLUtil {
topics: String,
properties: Properties,
offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
- app.source[T](new KafkaSource(topics, properties, offsetStorageFactory), parallelism, description)
+ app.source[T](new KafkaSource(topics, properties, offsetStorageFactory),
+ parallelism, description)
}
def createStream[T](
@@ -79,9 +81,8 @@ object KafkaDSLUtil {
offsetStorageFactory: OffsetStorageFactory,
messageDecoder: MessageDecoder,
timestampFilter: TimeStampFilter): dsl.Stream[T] = {
- app.source[T](new KafkaSource(topics, properties, offsetStorageFactory, messageDecoder, timestampFilter), parallelism, description)
+ app.source[T](new KafkaSource(topics, properties, offsetStorageFactory,
+ messageDecoder, timestampFilter), parallelism, description)
}
}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
index 873e614..f846efe 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
package io.gearpump.streaming.kafka.lib
+import scala.util.{Failure, Success}
+
import com.twitter.bijection.Injection
-import io.gearpump.streaming.transaction.api.MessageDecoder
-import io.gearpump.Message
-import scala.util.{Failure, Success}
+import io.gearpump.Message
+import io.gearpump.streaming.transaction.api.MessageDecoder
class DefaultMessageDecoder extends MessageDecoder {
override def fromBytes(bytes: Array[Byte]): Message = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
index ecaa294..e9c95e3 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,24 @@
package io.gearpump.streaming.kafka.lib
+import scala.util.{Failure, Success, Try}
+
import com.twitter.bijection.Injection
-import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage}
-import io.gearpump._
-import OffsetStorage.{Overflow, StorageEmpty, Underflow}
-import io.gearpump.util.LogUtil
import org.slf4j.Logger
-import scala.util.{Failure, Success, Try}
+import io.gearpump._
+import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage}
+import io.gearpump.util.LogUtil
object KafkaOffsetManager {
private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager])
}
private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager {
- import KafkaOffsetManager._
+ import io.gearpump.streaming.kafka.lib.KafkaOffsetManager._
- var maxTime: TimeStamp = 0L
+ var maxTime: TimeStamp = 0L
override def filter(messageAndOffset: (Message, Long)): Option[Message] = {
val (message, offset) = messageAndOffset
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
index 50905fc..123f3ac 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,13 +20,13 @@ package io.gearpump.streaming.kafka.lib
import java.util.Properties
-import io.gearpump.streaming.kafka.KafkaSource
-import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper}
-import io.gearpump.util.LogUtil
import kafka.api.OffsetRequest
import kafka.consumer.ConsumerConfig
import org.slf4j.Logger
+import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper}
+import io.gearpump.util.LogUtil
+
object KafkaSourceConfig {
val NAME = "kafka_config"
@@ -45,12 +45,14 @@ object KafkaSourceConfig {
}
/**
- * this class extends kafka kafka.consumer.ConsumerConfig with specific configs for [[KafkaSource]]
+ * Extends kafka.consumer.ConsumerConfig with specific config needed by
+ * [[io.gearpump.streaming.kafka.KafkaSource]]
*
* @param consumerProps kafka consumer config
*/
-class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends java.io.Serializable {
- import KafkaSourceConfig._
+class KafkaSourceConfig(val consumerProps: Properties = new Properties)
+ extends java.io.Serializable {
+ import io.gearpump.streaming.kafka.lib.KafkaSourceConfig._
if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) {
consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181")
@@ -63,9 +65,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps)
/**
- * set kafka consumer topics
+ * Set kafka consumer topics, seperated by comma.
+ *
* @param topics comma-separated string
- * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.CONSUMER_TOPICS]] set to given value
+ * @return new KafkaConfig based on this but with
+ * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]]
+ * set to given value
*/
def withConsumerTopics(topics: String): KafkaSourceConfig = {
consumerProps.setProperty(CONSUMER_TOPICS, topics)
@@ -73,18 +78,22 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
}
/**
- * @return a list of kafka consumer topics
+ * Returns a list of kafka consumer topics
*/
def getConsumerTopics: List[String] = {
Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList
}
/**
- * [[consumer.FetchThread]] will sleep for a while if no more messages or
- * the incoming queue size is above the [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]]
- * this is to set sleep interval
+ * Sets the sleep interval if there are no more message or message buffer is full.
+ *
+ * Consumer.FetchThread will sleep for a while if no more messages or
+ * the incoming queue size is above the
+ * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
+ *
* @param sleepMS sleep interval in milliseconds
- * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_SLEEP_MS]] set to given value
+ * @return new KafkaConfig based on this but with
+ * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]] set to given value
*/
def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = {
consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS")
@@ -92,9 +101,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
}
/**
- * [[consumer.FetchThread]] will sleep for a while if no more messages or
- * the incoming queue size is above the [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]]
- * this is to get sleep interval
+ * Gets the sleep interval
+ *
+ * Consumer.FetchThread sleeps for a while if no more messages or
+ * the incoming queue is full (size is bigger than the
+ * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]])
+ *
* @return sleep interval in milliseconds
*/
def getFetchSleepMS: Int = {
@@ -102,10 +114,14 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
}
/**
- * [[consumer.FetchThread]] stops fetching new messages if its incoming queue
+ * Sets the batch size we use for one fetch.
+ *
+ * Consumer.FetchThread stops fetching new messages if its incoming queue
* size is above the threshold and starts again when the queue size is below it
+ *
* @param threshold queue size
- * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]] set to give value
+ * @return new KafkaConfig based on this but with
+ * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] set to give value
*/
def withFetchThreshold(threshold: Int): KafkaSourceConfig = {
consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold")
@@ -113,9 +129,11 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
}
/**
+ * Returns fetch batch size.
+ *
+ * Consumer.FetchThread stops fetching new messages if
+ * its incoming queue size is above the threshold and starts again when the queue size is below it
*
- * [[io.gearpump.streaming.kafka.lib.consumer.FetchThread]] stops fetching new messages if its incoming queue
- * size is above the threshold and starts again when the queue size is below it
* @return fetch threshold
*/
def getFetchThreshold: Int = {
@@ -123,11 +141,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
}
/**
- * set [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which
- * defines how kafka.common.TopicAndPartitions are mapped to source tasks
+ * Sets [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which
+ * defines how kafka.common.TopicAndPartitions are mapped to source tasks.
*
* @param className name of the factory class
- * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.GROUPER_CLASS]] set to given value
+ * @return new KafkaConfig based on this but with
+ * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]] set to given value
*/
def withGrouper(className: String): KafkaSourceConfig = {
consumerProps.setProperty(GROUPER_CLASS, className)
@@ -135,13 +154,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
}
/**
- * get [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which
+ * Returns [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which
* defines how kafka.common.TopicAndPartitions are mapped to source tasks
- * @return
*/
def getGrouper: KafkaGrouper = {
- Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS)).getOrElse(classOf[KafkaDefaultGrouper].getName))
- .newInstance().asInstanceOf[KafkaGrouper]
+ Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS))
+ .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper]
}
def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = {
@@ -150,7 +168,8 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
}
def getConsumerStartOffset: Long = {
- Option(consumerProps.getProperty(CONSUMER_START_OFFSET)).getOrElse(s"${OffsetRequest.EarliestTime}").toLong
+ Option(consumerProps.getProperty(CONSUMER_START_OFFSET))
+ .getOrElse(s"${OffsetRequest.EarliestTime}").toLong
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
index 54a66ae..2f7fcf7 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,11 +27,12 @@ import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient
-import io.gearpump.util.LogUtil
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.serialization.Serializer
import org.slf4j.Logger
+import io.gearpump.util.LogUtil
+
object KafkaUtil {
private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -39,7 +40,8 @@ object KafkaUtil {
val zkClient = connectZk
try {
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
- .getOrElse(throw new RuntimeException(s"leader not available for TopicAndPartition($topic, $partition)"))
+ .getOrElse(throw new RuntimeException(
+ s"leader not available for TopicAndPartition($topic, $partition)"))
ZkUtils.getBrokerInfo(zkClient, leader)
.getOrElse(throw new RuntimeException(s"broker info not found for leader $leader"))
} catch {
@@ -51,12 +53,13 @@ object KafkaUtil {
}
}
- def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String]): Array[TopicAndPartition] = {
+ def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String])
+ : Array[TopicAndPartition] = {
val zkClient = connectZk
try {
- ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
- case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
- }.toArray
+ ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
+ case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
+ }.toArray
} catch {
case e: Exception =>
LOG.error(e.getMessage)
@@ -80,10 +83,11 @@ object KafkaUtil {
}
/**
- * create a new kafka topic
- * return true if topic already exists, and false otherwise
+ * create a new kafka topic
+ * return true if topic already exists, and false otherwise
*/
- def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int): Boolean = {
+ def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int)
+ : Boolean = {
val zkClient = connectZk
try {
if (AdminUtils.topicExists(zkClient, topic)) {
@@ -132,15 +136,16 @@ object KafkaUtil {
case e: Exception =>
LOG.error(s"$filename not found")
} finally {
- if(propStream != null)
+ if (propStream != null) {
propStream.close()
+ }
}
props
}
def createKafkaProducer[K, V](properties: Properties,
- keySerializer: Serializer[K],
- valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
+ keySerializer: Serializer[K],
+ valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
}
@@ -153,11 +158,10 @@ object KafkaUtil {
properties
}
- def buildConsumerConfig(zkConnect: String): Properties = {
- val properties = new Properties()
- properties.setProperty("zookeeper.connect", zkConnect)
- properties.setProperty("group.id", "gearpump")
- properties
+ def buildConsumerConfig(zkConnect: String): Properties = {
+ val properties = new Properties()
+ properties.setProperty("zookeeper.connect", zkConnect)
+ properties.setProperty("group.id", "gearpump")
+ properties
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
index b821f15..141ae98 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,15 +7,13 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package io.gearpump.streaming.kafka.lib.consumer
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
index ee53151..b2b3f4f 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,17 +23,18 @@ import java.util.concurrent.LinkedBlockingQueue
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
-import io.gearpump.util.LogUtil
import org.slf4j.Logger
+import io.gearpump.util.LogUtil
+
object FetchThread {
private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
def apply(topicAndPartitions: Array[TopicAndPartition],
- fetchThreshold: Int,
- fetchSleepMS: Long,
- startOffsetTime: Long,
- consumerConfig: ConsumerConfig): FetchThread = {
+ fetchThreshold: Int,
+ fetchSleepMS: Long,
+ startOffsetTime: Long,
+ consumerConfig: ConsumerConfig): FetchThread = {
val createConsumer = (tp: TopicAndPartition) =>
KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
@@ -43,20 +44,22 @@ object FetchThread {
}
/**
- * A thread to fetch messages from multiple kafka [[TopicAndPartition]]s and puts them
+ * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them
* onto a queue, which is asynchronously polled by a consumer
*
- * @param createConsumer given a [[TopicAndPartition]], create a [[KafkaConsumer]] to connect to it
+ * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
+ * [[io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to
+ * connect to it
* @param incomingQueue a queue to buffer incoming messages
* @param fetchThreshold above which thread should stop fetching messages
* @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold
*/
private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
- createConsumer: TopicAndPartition => KafkaConsumer,
- incomingQueue: LinkedBlockingQueue[KafkaMessage],
- fetchThreshold: Int,
- fetchSleepMS: Long) extends Thread {
- import FetchThread._
+ createConsumer: TopicAndPartition => KafkaConsumer,
+ incomingQueue: LinkedBlockingQueue[KafkaMessage],
+ fetchThreshold: Int,
+ fetchSleepMS: Long) extends Thread {
+ import io.gearpump.streaming.kafka.lib.consumer.FetchThread._
private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
index 208a99d..77321b9 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,6 @@
package io.gearpump.streaming.kafka.lib.consumer
-import io.gearpump.streaming.kafka.lib.KafkaUtil
import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.common.ErrorMapping._
import kafka.common.TopicAndPartition
@@ -26,8 +25,11 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import kafka.message.MessageAndOffset
import kafka.utils.Utils
+import io.gearpump.streaming.kafka.lib.KafkaUtil
+
object KafkaConsumer {
- def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig): KafkaConsumer = {
+ def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig)
+ : KafkaConsumer = {
val connectZk = KafkaUtil.connectZookeeper(config)
val broker = KafkaUtil.getBroker(connectZk(), topic, partition)
val soTimeout = config.socketTimeoutMs
@@ -40,7 +42,7 @@ object KafkaConsumer {
.addFetch(topic, partition, offset, fetchSize)
.build()
- val response = consumer.fetch(request)
+ val response = consumer.fetch(request)
response.errorCode(topic, partition) match {
case NoError => response.messageSet(topic, partition).iterator
case error => throw exceptionFor(error)
@@ -51,13 +53,14 @@ object KafkaConsumer {
}
/**
- * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over messages from a kafka kafka.common.TopicAndPartition.
+ * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over
+ * messages from a kafka kafka.common.TopicAndPartition.
*/
class KafkaConsumer(consumer: SimpleConsumer,
- topic: String,
- partition: Int,
- getIterator: (Long) => Iterator[MessageAndOffset],
- startOffsetTime: Long = OffsetRequest.EarliestTime) {
+ topic: String,
+ partition: Int,
+ getIterator: (Long) => Iterator[MessageAndOffset],
+ startOffsetTime: Long = OffsetRequest.EarliestTime) {
private val earliestOffset = consumer
.earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1)
private var nextOffset: Long = earliestOffset
@@ -97,6 +100,4 @@ class KafkaConsumer(consumer: SimpleConsumer,
def close(): Unit = {
consumer.close()
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
index 18d1bf0..16330ed 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -28,9 +28,11 @@ import kafka.common.TopicAndPartition
* @param msg message payload
*/
case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long,
- key: Option[Array[Byte]], msg: Array[Byte]) {
+ key: Option[Array[Byte]], msg: Array[Byte]) {
+
def this(topic: String, partition: Int, offset: Long,
- key: Option[Array[Byte]], msg: Array[Byte]) =
+ key: Option[Array[Byte]], msg: Array[Byte]) = {
this(TopicAndPartition(topic, partition), offset, key, msg)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
index 4a384fc..0f968e2 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,7 +30,9 @@ import kafka.common.TopicAndPartition
* streamProducer1 gets (topicA, partition2), (topicB, partition2)
*/
class KafkaDefaultGrouper extends KafkaGrouper {
- def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition] = {
- topicAndPartitions.indices.filter(_ % taskNum == taskIndex).map(i => topicAndPartitions(i)).toArray
+ def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+ : Array[TopicAndPartition] = {
+ topicAndPartitions.indices.filter(_ % taskNum == taskIndex)
+ .map(i => topicAndPartitions(i)).toArray
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
index 035917b..6660a04 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,7 @@ import kafka.common.TopicAndPartition
* this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks
*/
trait KafkaGrouper {
- def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition]
+ def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+ : Array[TopicAndPartition]
}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
index 86bc099..2b00414 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,8 +19,6 @@
package io.gearpump.streaming.kafka
import com.twitter.bijection.Injection
-import io.gearpump.Message
-import io.gearpump.streaming.MockUtil
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.mockito.Mockito._
import org.scalacheck.Gen
@@ -28,6 +26,9 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
+import io.gearpump.Message
+import io.gearpump.streaming.MockUtil
+
class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
val dataGen = for {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
index e67b572..6cd78fc 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,10 @@
package io.gearpump.streaming.kafka
+import scala.util.{Failure, Success}
+
import com.twitter.bijection.Injection
-import io.gearpump.streaming.kafka.lib.{KafkaSourceConfig, KafkaOffsetManager}
-import io.gearpump.streaming.kafka.lib.consumer.{KafkaMessage, FetchThread}
-import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, TimeStampFilter, MessageDecoder, OffsetStorage}
import kafka.common.TopicAndPartition
-import io.gearpump.Message
-import io.gearpump.streaming.kafka.lib.consumer.FetchThread
-import io.gearpump.streaming.kafka.lib.KafkaOffsetManager
-import OffsetStorage.StorageEmpty
-import io.gearpump.streaming.transaction.api.OffsetStorageFactory
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalacheck.Gen
@@ -35,7 +29,11 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import scala.util.{Failure, Success}
+import io.gearpump.Message
+import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
+import io.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig}
+import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
+import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -50,8 +48,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
val timestampFilter = mock[TimeStampFilter]
val offsetStorageFactory = mock[OffsetStorageFactory]
val kafkaConfig = mock[KafkaSourceConfig]
- val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter,
- Some(fetchThread), Map(topicAndPartition -> offsetManager))
+ val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+ timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
kafkaSource.setStartTime(None)
@@ -68,8 +66,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
val timestampFilter = mock[TimeStampFilter]
val offsetStorageFactory = mock[OffsetStorageFactory]
val kafkaConfig = mock[KafkaSourceConfig]
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter,
- Some(fetchThread), Map(topicAndPartition -> offsetManager))
+ val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+ timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty))
@@ -95,8 +93,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
val timestampFilter = mock[TimeStampFilter]
val offsetStorageFactory = mock[OffsetStorageFactory]
val kafkaConfig = mock[KafkaSourceConfig]
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter,
- Some(fetchThread), Map(topicAndPartition -> offsetManager))
+ val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+ timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset))
@@ -135,7 +133,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
val kafkaConfig = mock[KafkaSourceConfig]
val offsetManagers = kafkaMsgList.map(_.topicAndPartition -> offsetManager).toMap
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, Some(fetchThread), offsetManagers)
+ val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+ timestampFilter, Some(fetchThread), offsetManagers)
if (number == 0) {
verify(fetchThread, never()).poll
@@ -171,10 +170,9 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
val messageDecoder = mock[MessageDecoder]
val offsetStorageFactory = mock[OffsetStorageFactory]
val kafkaConfig = mock[KafkaSourceConfig]
- val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, Some(fetchThread),
- Map(topicAndPartition -> offsetManager))
+ val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+ timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
source.close()
verify(offsetManager).close()
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
index 2453adf..f3b5425 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,8 +20,8 @@ package io.gearpump.streaming.kafka.lib
import com.twitter.bijection.Injection
import org.scalacheck.Gen
-import org.scalatest.{PropSpec, Matchers}
import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
property("DefaultMessageDecoder should keep the original bytes data in Message") {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
index e185a29..c762b06 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,18 @@
package io.gearpump.streaming.kafka.lib
+import scala.util.{Failure, Success}
+
import com.twitter.bijection.Injection
-import io.gearpump.streaming.transaction.api.OffsetStorage
-import io.gearpump.Message
-import OffsetStorage.{Overflow, StorageEmpty, Underflow}
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import scala.util.{Failure, Success}
+import io.gearpump.Message
+import io.gearpump.streaming.transaction.api.OffsetStorage
+import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -40,11 +41,12 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers
val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex)
- property("KafkaOffsetManager should append offset to storage in monotonically increasing time order") {
+ property("KafkaOffsetManager should append offset to storage in monotonically" +
+ " increasing time order") {
forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) =>
val offsetStorage = mock[OffsetStorage]
val offsetManager = new KafkaOffsetManager(offsetStorage)
- messageAndOffsets.foldLeft(0L){ (max, messageAndOffset) =>
+ messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) =>
val (message, offset) = messageAndOffset
offsetManager.filter((message, offset.toLong)) shouldBe Option(message)
if (message.timestamp > max) {
@@ -62,7 +64,8 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers
val minTimeStampGen = Gen.choose[Long](0L, 500L)
val maxTimeStampGen = Gen.choose[Long](500L, 1000L)
- property("KafkaOffsetManager resolveOffset should report StorageEmpty failure when storage is empty") {
+ property("KafkaOffsetManager resolveOffset should " +
+ "report StorageEmpty failure when storage is empty") {
forAll(timeStampGen) { (time: Long) =>
val offsetStorage = mock[OffsetStorage]
val offsetManager = new KafkaOffsetManager(offsetStorage)
@@ -78,16 +81,19 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers
}
val offsetGen = Gen.choose[Long](0L, 1000L)
- property("KafkaOffsetManager resolveOffset should return a valid offset when storage is not empty") {
+ property("KafkaOffsetManager resolveOffset should return a valid" +
+ " offset when storage is not empty") {
forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) {
(time: Long, min: Long, max: Long, offset: Long) =>
val offsetStorage = mock[OffsetStorage]
val offsetManager = new KafkaOffsetManager(offsetStorage)
if (time < min) {
- when(offsetStorage.lookUp(time)).thenReturn(Failure(Underflow(Injection[Long, Array[Byte]](min))))
+ when(offsetStorage.lookUp(time)).thenReturn(Failure(
+ Underflow(Injection[Long, Array[Byte]](min))))
offsetManager.resolveOffset(time) shouldBe Success(min)
} else if (time > max) {
- when(offsetStorage.lookUp(time)).thenReturn(Failure(Overflow(Injection[Long, Array[Byte]](max))))
+ when(offsetStorage.lookUp(time)).thenReturn(Failure(
+ Overflow(Injection[Long, Array[Byte]](max))))
offsetManager.resolveOffset(time) shouldBe Success(max)
} else {
when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset)))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
index bcc757b..af23c12 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,163 +18,170 @@
package io.gearpump.streaming.kafka.lib
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- val minTimeGen = Gen.choose[Long](1L, 500L)
- val maxTimeGen = Gen.choose[Long](500L, 999L)
-
- /* property("KafkaStorage lookup time should report StorageEmpty if storage is empty") {
- forAll { (time: Long, topic: String) =>
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val getConsumer = () => mock[KafkaConsumer]
- val connectZk = () => mock[ZkClient]
- val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), connectZk())
- storage.lookUp(time) shouldBe Failure(StorageEmpty)
- }
- }
-
- property("KafkaStorage lookup time should return data or report failure if storage is not empty") {
- forAll(minTimeGen, maxTimeGen, Gen.alphaStr) { (minTime: Long, maxTime: Long, topic: String) =>
- val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) =>
- val offset = index.toLong
- time -> offset
- }
- val timeAndOffsetsMap = timeAndOffsets.toMap
- val data = timeAndOffsets.map {
- case (time, offset) =>
- new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)),
- Injection[Long, Array[Byte]](offset))
- }.toList
-
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val consumer = mock[KafkaConsumer]
- val getConsumer = () => consumer
- val connectZk = () => mock[ZkClient]
-
- val hasNexts = List.fill(data.tail.size)(true) :+ false
- when(consumer.hasNext).thenReturn(true, hasNexts:_*)
- when(consumer.next).thenReturn(data.head, data.tail:_*)
-
- val storage = new KafkaStorage(topic, topicExists = true, producer, getConsumer(), connectZk())
- forAll(Gen.choose[Long](minTime, maxTime)) {
- time =>
- storage.lookUp(time) match {
- case Success(array) => array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time)))
- case Failure(e) => fail("time in range should return Success with value")
- }
- }
-
- forAll(Gen.choose[Long](0L, minTime - 1)) {
- time =>
- storage.lookUp(time) match {
- case Failure(e) => e shouldBe a [Underflow]
- e.asInstanceOf[Underflow].min should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime)))
- case Success(_) => fail("time less than min should return Underflow failure")
- }
- }
-
- forAll(Gen.choose[Long](maxTime + 1, 1000L)) {
- time =>
- storage.lookUp(time) match {
- case Failure(e) => e shouldBe a [Overflow]
- e.asInstanceOf[Overflow].max should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime)))
- case Success(_) => fail("time larger than max should return Overflow failure")
- }
- }
- }
- }
-
- property("KafkaStorage append should send data to Kafka") {
- forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000), Gen.alphaStr, Gen.oneOf(true, false)) {
- (time: Long, offset: Long, topic: String, topicExists: Boolean) =>
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val getConsumer = () => mock[KafkaConsumer]
- val connectZk = () => mock[ZkClient]
- val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk())
- val offsetBytes = Injection[Long, Array[Byte]](offset)
- storage.append(time, offsetBytes)
- verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]())
- }
- }
-
- val topicAndPartitionGen = for {
- topic <- Gen.alphaStr
- partition <- Gen.choose[Int](0, 100)
- } yield TopicAndPartition(topic, partition)
- property("KafkaStorage should load data from Kafka") {
- val kafkaMsgGen = for {
- timestamp <- Gen.choose[Long](1L, 1000L)
- offset <- Gen.choose[Long](0L, 1000L)
- } yield (timestamp, Injection[Long, Array[Byte]](offset))
- val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen)
-
- val topicExistsGen = Gen.oneOf(true, false)
-
- forAll(topicAndPartitionGen, msgListGen) {
- (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) =>
- val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val consumer = mock[KafkaConsumer]
- val getConsumer = () => consumer
- val connectZk = () => mock[ZkClient]
- val kafkaStorage = new KafkaStorage(topicAndPartition.topic, topicExists = true, producer, getConsumer(), connectZk())
- msgList match {
- case Nil =>
- when(consumer.hasNext).thenReturn(false)
- case list =>
- val hasNexts = List.fill(list.tail.size)(true) :+ false
- val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) =>
- KafkaMessage(topicAndPartition, index.toLong, Some(Injection[Long, Array[Byte]](timestamp)), bytes)
- }
- when(consumer.hasNext).thenReturn(true, hasNexts: _*)
- when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*)
- }
- kafkaStorage.load(consumer) shouldBe msgList
- }
- }
-
- property("KafkaStorage should not get consumer when topic doesn't exist") {
- forAll(Gen.alphaStr) { (topic: String) =>
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val getConsumer = mock[() => KafkaConsumer]
- val connectZk = () => mock[ZkClient]
- val kafkaStorage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), connectZk())
- verify(getConsumer, never()).apply()
- kafkaStorage.close()
- }
- }
-
- property("KafkaStorage should fail to load invalid KafkaMessage") {
- val invalidKafkaMsgGen = for {
- tp <- topicAndPartitionGen
- offset <- Gen.choose[Long](1L, 1000L)
- timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))), None)
- msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
- } yield KafkaMessage(tp, offset, timestamp, msg)
- forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) =>
- val consumer = mock[KafkaConsumer]
- val getConsumer = () => consumer
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val connectZk = () => mock[ZkClient]
- val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic, topicExists = true,
- producer, getConsumer(), connectZk())
- when(consumer.hasNext).thenReturn(true, false)
- when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg)
- Try(kafkaStorage.load(consumer)).isFailure shouldBe true
- }
- }
-
- property("KafkaStorage close should close kafka producer and delete topic") {
- val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
- val getConsumer = () => mock[KafkaConsumer]
- val zkClient = mock[ZkClient]
- val connectZk = () => zkClient
- val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk())
- kafkaStorage.close()
- verify(producer).close()
- verify(zkClient).createPersistent(anyString(), anyString())
- }*/
-}
+// TODO: Fix the UT failure!
+
+// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+// val minTimeGen = Gen.choose[Long](1L, 500L)
+// val maxTimeGen = Gen.choose[Long](500L, 999L)
+//
+// property("KafkaStorage lookup time should report StorageEmpty if storage is empty") {
+// forAll { (time: Long, topic: String) =>
+// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+// val getConsumer = () => mock[KafkaConsumer]
+// val connectZk = () => mock[ZkClient]
+// val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(),
+// connectZk())
+// storage.lookUp(time) shouldBe Failure(StorageEmpty)
+// }
+// }
+//
+// property("KafkaStorage lookup time should return data or report failure if storage not empty") {
+// forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) =>
+// val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) =>
+// val offset = index.toLong
+// time -> offset
+// }
+// val timeAndOffsetsMap = timeAndOffsets.toMap
+// val data = timeAndOffsets.map {
+// case (time, offset) =>
+// new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)),
+// Injection[Long, Array[Byte]](offset))
+// }.toList
+//
+// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+// val consumer = mock[KafkaConsumer]
+// val getConsumer = () => consumer
+// val connectZk = () => mock[ZkClient]
+//
+// val hasNexts = List.fill(data.tail.size)(true) :+ false
+// when(consumer.hasNext).thenReturn(true, hasNexts:_*)
+// when(consumer.next).thenReturn(data.head, data.tail:_*)
+//
+// val storage = new KafkaStorage(topic, topicExists = true, producer,
+// getConsumer(), connectZk())
+// forAll(Gen.choose[Long](minTime, maxTime)) {
+// time =>
+// storage.lookUp(time) match {
+// case Success(array) =>
+// array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time)))
+// case Failure(e) => fail("time in range should return Success with value")
+// }
+// }
+//
+// forAll(Gen.choose[Long](0L, minTime - 1)) {
+// time =>
+// storage.lookUp(time) match {
+// case Failure(e) => e shouldBe a [Underflow]
+// e.asInstanceOf[Underflow].min should equal
+// (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime)))
+// case Success(_) => fail("time less than min should return Underflow failure")
+// }
+// }
+//
+// forAll(Gen.choose[Long](maxTime + 1, 1000L)) {
+// time =>
+// storage.lookUp(time) match {
+// case Failure(e) => e shouldBe a [Overflow]
+// e.asInstanceOf[Overflow].max should equal
+// (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime)))
+// case Success(_) => fail("time larger than max should return Overflow failure")
+// }
+// }
+// }
+// }
+//
+// property("KafkaStorage append should send data to Kafka") {
+// forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000),
+// Gen.alphaStr, Gen.oneOf(true, false)) {
+// (time: Long, offset: Long, topic: String, topicExists: Boolean) =>
+// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+// val getConsumer = () => mock[KafkaConsumer]
+// val connectZk = () => mock[ZkClient]
+// val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk())
+// val offsetBytes = Injection[Long, Array[Byte]](offset)
+// storage.append(time, offsetBytes)
+// verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]())
+// }
+// }
+//
+// val topicAndPartitionGen = for {
+// topic <- Gen.alphaStr
+// partition <- Gen.choose[Int](0, 100)
+// } yield TopicAndPartition(topic, partition)
+// property("KafkaStorage should load data from Kafka") {
+// val kafkaMsgGen = for {
+// timestamp <- Gen.choose[Long](1L, 1000L)
+// offset <- Gen.choose[Long](0L, 1000L)
+// } yield (timestamp, Injection[Long, Array[Byte]](offset))
+// val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen)
+//
+// val topicExistsGen = Gen.oneOf(true, false)
+//
+// forAll(topicAndPartitionGen, msgListGen) {
+// (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) =>
+// val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]]
+// val consumer = mock[KafkaConsumer]
+// val getConsumer = () => consumer
+// val connectZk = () => mock[ZkClient]
+// val kafkaStorage = new KafkaStorage(topicAndPartition.topic,
+// topicExists = true, producer, getConsumer(), connectZk())
+// msgList match {
+// case Nil =>
+// when(consumer.hasNext).thenReturn(false)
+// case list =>
+// val hasNexts = List.fill(list.tail.size)(true) :+ false
+// val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) =>
+// KafkaMessage(topicAndPartition, index.toLong,
+// Some(Injection[Long, Array[Byte]](timestamp)), bytes)
+// }
+// when(consumer.hasNext).thenReturn(true, hasNexts: _*)
+// when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*)
+// }
+// kafkaStorage.load(consumer) shouldBe msgList
+// }
+// }
+//
+// property("KafkaStorage should not get consumer when topic doesn't exist") {
+// forAll(Gen.alphaStr) { (topic: String) =>
+// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+// val getConsumer = mock[() => KafkaConsumer]
+// val connectZk = () => mock[ZkClient]
+// val kafkaStorage = new KafkaStorage(topic,
+// topicExists = false, producer, getConsumer(), connectZk())
+// verify(getConsumer, never()).apply()
+// kafkaStorage.close()
+// }
+// }
+//
+// property("KafkaStorage should fail to load invalid KafkaMessage") {
+// val invalidKafkaMsgGen = for {
+// tp <- topicAndPartitionGen
+// offset <- Gen.choose[Long](1L, 1000L)
+// timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))),
+// None)
+// msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
+// } yield KafkaMessage(tp, offset, timestamp, msg)
+// forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) =>
+// val consumer = mock[KafkaConsumer]
+// val getConsumer = () => consumer
+// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+// val connectZk = () => mock[ZkClient]
+// val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic,
+// topicExists = true, producer, getConsumer(), connectZk())
+// when(consumer.hasNext).thenReturn(true, false)
+// when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg)
+// Try(kafkaStorage.load(consumer)).isFailure shouldBe true
+// }
+// }
+//
+// property("KafkaStorage close should close kafka producer and delete topic") {
+// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+// val getConsumer = () => mock[KafkaConsumer]
+// val zkClient = mock[ZkClient]
+// val connectZk = () => zkClient
+// val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk())
+// kafkaStorage.close()
+// verify(producer).close()
+// verify(zkClient).createPersistent(anyString(), anyString())
+// }
+// }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
index 24c1c5d..7447308 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,34 +18,36 @@
package io.gearpump.streaming.kafka.lib
-import io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper
-import io.gearpump.streaming.kafka.util.KafkaServerHarness
import kafka.common.TopicAndPartition
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.server.{KafkaConfig => KafkaServerConfig}
+import kafka.utils.{TestUtils, TestZKUtils}
import org.scalatest.prop.PropertyChecks
-import org.scalatest.{PropSpec, Matchers, BeforeAndAfterEach}
+import org.scalatest.{BeforeAndAfterEach, Matchers, PropSpec}
-import kafka.server.{KafkaConfig => KafkaServerConfig}
+import io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper
+import io.gearpump.streaming.kafka.util.KafkaServerHarness
+class KafkaUtilSpec
+ extends PropSpec with PropertyChecks with BeforeAndAfterEach
+ with Matchers with KafkaServerHarness {
-class KafkaUtilSpec extends PropSpec with PropertyChecks with BeforeAndAfterEach with Matchers with KafkaServerHarness {
val numServers = 1
override val configs: List[KafkaServerConfig] =
- for(props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false))
- yield new KafkaServerConfig(props) {
- override val zkConnect = TestZKUtils.zookeeperConnect
- override val numPartitions = 4
- }
+ for (props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false))
+ yield new KafkaServerConfig(props) {
+ override val zkConnect = TestZKUtils.zookeeperConnect
+ override val numPartitions = 4
+ }
- override def beforeEach: Unit = {
+ override def beforeEach(): Unit = {
super.setUp()
}
- override def afterEach: Unit = {
+ override def afterEach(): Unit = {
super.tearDown()
}
- import KafkaUtil._
+ import io.gearpump.streaming.kafka.lib.KafkaUtil._
property("KafkaUtil should be able to create topic") {
val args = {
@@ -91,13 +93,15 @@ class KafkaUtilSpec extends PropSpec with PropertyChecks with BeforeAndAfterEach
property("KafkaUtil should be able to get TopicAndPartitions info and group with KafkaGrouper") {
val grouper: KafkaGrouper = new KafkaGrouper {
- override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition] =
+ override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+ : Array[TopicAndPartition] = {
topicAndPartitions
+ }
}
val topicNum = 3
val topics = List.fill(topicNum)(TestUtils.tempTopic())
topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, replicas = 1))
- KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe topics.map(t => TopicAndPartition(t, 0)).toSet
+ KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe
+ topics.map(t => TopicAndPartition(t, 0)).toSet
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
index a20e575..3983874 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,20 +7,18 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package io.gearpump.streaming.kafka.lib.consumer
-import org.scalatest.{WordSpec, Matchers}
+import org.scalatest.{Matchers, WordSpec}
class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
@@ -69,4 +66,3 @@ class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
index 92d3c31..d955dfa 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -35,14 +35,14 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo
property("FetchThread should set startOffset to iterators") {
forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
(fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
- val topicAndPartition = mock[TopicAndPartition]
- val consumer = mock[KafkaConsumer]
- val createConsumer = (tp: TopicAndPartition) => consumer
- val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
- val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
- incomingQueue, fetchThreshold, fetchSleepMS)
- fetchThread.setStartOffset(topicAndPartition, startOffset)
- verify(consumer).setStartOffset(startOffset)
+ val topicAndPartition = mock[TopicAndPartition]
+ val consumer = mock[KafkaConsumer]
+ val createConsumer = (tp: TopicAndPartition) => consumer
+ val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+ val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
+ incomingQueue, fetchThreshold, fetchSleepMS)
+ fetchThread.setStartOffset(topicAndPartition, startOffset)
+ verify(consumer).setStartOffset(startOffset)
}
}
@@ -50,10 +50,11 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo
topic <- Gen.alphaStr
partition <- Gen.choose[Int](0, Int.MaxValue)
} yield TopicAndPartition(topic, partition)
- property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") {
+ property("FetchThread should only fetchMessage when the number " +
+ "of messages in queue is below the threshold") {
forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) {
(messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
- startOffset: Long, topicAndPartition: TopicAndPartition) =>
+ startOffset: Long, topicAndPartition: TopicAndPartition) =>
val message = mock[KafkaMessage]
val consumer = mock[KafkaConsumer]
val createConsumer = (tp: TopicAndPartition) => consumer
@@ -87,8 +88,12 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo
tp <- topicAndPartitionGen
hasNext <- Gen.oneOf(true, false)
} yield (tp, hasNext)
- val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen).map(_.toMap) suchThat (_.nonEmpty)
- property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") {
+
+ val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen)
+ .map(_.toMap) suchThat (_.nonEmpty)
+
+ property("FetchThread fetchMessage should return false when there are no more messages " +
+ "from any TopicAndPartition") {
forAll(tpHasNextMapGen, nonNegativeGen) {
(tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) =>
val createConsumer = (tp: TopicAndPartition) => {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
index 2669f60..524fc9b 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,