You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:43 UTC

[33/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
index 968834e..3dede8e 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,21 +19,20 @@
 package io.gearpump.streaming.kafka
 
 import java.util.Properties
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import kafka.common.TopicAndPartition
+import org.slf4j.Logger
 
-import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig, KafkaUtil, KafkaOffsetManager}
 import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
+import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil}
 import io.gearpump.streaming.source.DefaultTimeStampFilter
 import io.gearpump.streaming.task.TaskContext
+import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
 import io.gearpump.streaming.transaction.api._
-import kafka.common.TopicAndPartition
-import OffsetStorage.StorageEmpty
 import io.gearpump.util.LogUtil
 import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Success}
-
 
 object KafkaSource {
   private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
@@ -44,15 +43,19 @@ object KafkaSource {
  * from multiple Kafka TopicAndPartition in a round-robin way.
  *
  * This is a TimeReplayableSource which is able to replay messages given a start time.
- * Each kafka message is tagged with a timestamp by [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) mapping
- * is stored to a [[OffsetStorage]]. On recovery, we could retrieve the previously stored offset
- * from the [[OffsetStorage]] by timestamp and start to read from there.
+ * Each kafka message is tagged with a timestamp by
+ * [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) mapping
+ * is stored to a [[io.gearpump.streaming.transaction.api.OffsetStorage]]. On recovery,
+ * we could retrieve the previously stored offset from the
+ * [[io.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read from
+ * there.
  *
- * kafka message is wrapped into gearpump [[Message]] and further filtered by a [[TimeStampFilter]]
+ * kafka message is wrapped into gearpump [[io.gearpump.Message]] and further filtered by a
+ * [[io.gearpump.streaming.transaction.api.TimeStampFilter]]
  * such that obsolete messages are dropped.
  *
  * @param config kafka source config
- * @param messageDecoder decodes [[Message]] from raw bytes
+ * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
  * @param timestampFilter filters out message based on timestamp
  * @param fetchThread fetches messages and puts on a in-memory queue
  * @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition
@@ -63,59 +66,66 @@ class KafkaSource(
     messageDecoder: MessageDecoder = new DefaultMessageDecoder,
     timestampFilter: TimeStampFilter = new DefaultTimeStampFilter,
     private var fetchThread: Option[FetchThread] = None,
-    private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = Map.empty[TopicAndPartition, KafkaOffsetManager])
-  extends TimeReplayableSource {
-  import KafkaSource._
+    private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = {
+      Map.empty[TopicAndPartition, KafkaOffsetManager]
+    }) extends TimeReplayableSource {
+  import io.gearpump.streaming.kafka.KafkaSource._
 
   private var startTime: Option[TimeStamp] = None
 
-
   /**
+   * Constructs a Kafka Source by...
+   *
    * @param topics comma-separated string of topics
    * @param properties kafka consumer config
    * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *                            that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
    *
    */
   def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = {
     this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory)
   }
   /**
+   * Constructs a Kafka Source by...
+   *
    * @param topics comma-separated string of topics
    * @param properties kafka consumer config
    * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *                            that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
-   * @param messageDecoder decodes [[Message]] from raw bytes
+   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+   * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
    * @param timestampFilter filters out message based on timestamp
    */
   def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory,
-           messageDecoder: MessageDecoder,
-           timestampFilter: TimeStampFilter) = {
+      messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = {
     this(KafkaSourceConfig(properties)
       .withConsumerTopics(topics), offsetStorageFactory,
       messageDecoder, timestampFilter)
   }
 
   /**
+   * Constructs a Kafka Source by...
+   *
    * @param topics comma-separated string of topics
    * @param zkConnect kafka consumer config `zookeeper.connect`
    * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *                            that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
    */
   def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) =
     this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory)
 
   /**
+   * Constructs a Kafka Source by...
+   *
    * @param topics comma-separated string of topics
    * @param zkConnect kafka consumer config `zookeeper.connect`
    * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *                            that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
-   * @param messageDecoder decodes [[Message]] from raw bytes
+   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
+   * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
    * @param timestampFilter filters out message based on timestamp
    */
   def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory,
-           messageDecoder: MessageDecoder,
-           timestampFilter: TimeStampFilter) = {
+      messageDecoder: MessageDecoder,
+      timestampFilter: TimeStampFilter) = {
     this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory,
       messageDecoder, timestampFilter)
   }
@@ -191,5 +201,4 @@ class KafkaSource(
   override def close(): Unit = {
     offsetManagers.foreach(_._2.close())
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
index eacd267..e50bf84 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,36 +19,37 @@
 package io.gearpump.streaming.kafka
 
 import java.util.Properties
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
 
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.kafka.lib.KafkaUtil
-import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, OffsetStorage}
 import kafka.api.OffsetRequest
 import kafka.consumer.ConsumerConfig
 import org.I0Itec.zkclient.ZkClient
-import io.gearpump.TimeStamp
-import OffsetStorage.{Overflow, StorageEmpty, Underflow}
-import io.gearpump.util.LogUtil
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.slf4j.Logger
 
-import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
+import io.gearpump.TimeStamp
+import io.gearpump.streaming.kafka.lib.KafkaUtil
+import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
+import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import io.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory}
+import io.gearpump.util.LogUtil
 
 /**
- * factory that builds [[KafkaStorage]]
+ * Factory that builds [[KafkaStorage]]
  *
  * @param consumerProps kafka consumer config
  * @param producerProps kafka producer config
  */
-class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties) extends OffsetStorageFactory {
+class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties)
+  extends OffsetStorageFactory {
 
   /**
-   *
-   * this creates consumer config properties with `zookeeper.connect` set to zkConnect
+   * Creates consumer config properties with `zookeeper.connect` set to zkConnect
    * and producer config properties with `bootstrap.servers` set to bootstrapServers
+   *
    * @param zkConnect kafka consumer config `zookeeper.connect`
    * @param bootstrapServers kafka producer config `bootstrap.servers`
    */
@@ -70,7 +71,8 @@ object KafkaStorage {
 }
 
 /**
- * this stores offset-timestamp mapping to kafka
+ * Stores offset-timestamp mapping to kafka
+ *
  * @param topic kafka store topic
  * @param producer kafka producer
  * @param getConsumer function to get kafka consumer
@@ -83,11 +85,10 @@ class KafkaStorage private[kafka](
     connectZk: => ZkClient)
   extends OffsetStorage {
 
-
   private lazy val consumer = getConsumer
 
   private val dataByTime: List[(TimeStamp, Array[Byte])] = {
-    if (KafkaUtil.topicExists(connectZk, topic)){
+    if (KafkaUtil.topicExists(connectZk, topic)) {
       load(consumer)
     } else {
       List.empty[(TimeStamp, Array[Byte])]
@@ -95,8 +96,9 @@ class KafkaStorage private[kafka](
   }
 
   /**
-   * offsets with timestamp < `time` have already been processed by the system
-   * so we look up the storage for the first offset with timestamp >= `time` on replay
+   * Offsets with timestamp less than `time` have already been processed by the system
+   * so we look up the storage for the first offset with timestamp large equal than `time`
+   * on replay.
    *
    * @param time the timestamp to look up for the earliest unprocessed offset
    * @return the earliest unprocessed offset if `time` is in the range, otherwise failure
@@ -143,5 +145,4 @@ class KafkaStorage private[kafka](
     consumer.close()
     messagesBuilder.result().toList
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
index 5c700be..2a852e2 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,6 +24,8 @@ import io.gearpump.streaming.dsl
 import io.gearpump.streaming.kafka.KafkaSink
 
 class KafkaDSLSink[T](stream: dsl.Stream[T]) {
+
+  /** Create a Kafka DSL Sink */
   def writeToKafka(
       topic: String,
       bootstrapServers: String,
@@ -42,6 +44,9 @@ class KafkaDSLSink[T](stream: dsl.Stream[T]) {
 }
 
 object KafkaDSLSink {
+
+  import scala.language.implicitConversions
+
   implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = {
     new KafkaDSLSink[T](stream)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
index 7f752e5..325b40f 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,10 +20,10 @@ package io.gearpump.streaming.kafka.dsl
 import java.util.Properties
 
 import io.gearpump.streaming.dsl
-import io.gearpump.streaming.dsl.{StreamApp}
+import io.gearpump.streaming.dsl.StreamApp
 import io.gearpump.streaming.kafka.KafkaSource
 import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig}
-import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, TimeStampFilter, MessageDecoder}
+import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
 
 object KafkaDSLUtil {
   def createStream[T](
@@ -33,7 +33,8 @@ object KafkaDSLUtil {
       kafkaConfig: KafkaSourceConfig,
       offsetStorageFactory: OffsetStorageFactory,
       messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder), parallelism, description)
+    app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder),
+      parallelism, description)
   }
 
   def createStream[T](
@@ -43,8 +44,8 @@ object KafkaDSLUtil {
       topics: String,
       zkConnect: String,
       offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory)
-        , parallelism, description)
+    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory),
+      parallelism, description)
   }
 
   def createStream[T](
@@ -56,8 +57,8 @@ object KafkaDSLUtil {
       offsetStorageFactory: OffsetStorageFactory,
       messageDecoder: MessageDecoder,
       timestampFilter: TimeStampFilter): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory, messageDecoder, timestampFilter)
-        , parallelism, description)
+    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory,
+    messageDecoder, timestampFilter), parallelism, description)
   }
 
   def createStream[T](
@@ -67,7 +68,8 @@ object KafkaDSLUtil {
       topics: String,
       properties: Properties,
       offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory), parallelism, description)
+    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory),
+    parallelism, description)
   }
 
   def createStream[T](
@@ -79,9 +81,8 @@ object KafkaDSLUtil {
       offsetStorageFactory: OffsetStorageFactory,
       messageDecoder: MessageDecoder,
       timestampFilter: TimeStampFilter): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory, messageDecoder, timestampFilter), parallelism, description)
+    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory,
+    messageDecoder, timestampFilter), parallelism, description)
   }
 }
 
-
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
index 873e614..f846efe 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 
 package io.gearpump.streaming.kafka.lib
 
+import scala.util.{Failure, Success}
+
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.transaction.api.MessageDecoder
-import io.gearpump.Message
 
-import scala.util.{Failure, Success}
+import io.gearpump.Message
+import io.gearpump.streaming.transaction.api.MessageDecoder
 
 class DefaultMessageDecoder extends MessageDecoder {
   override def fromBytes(bytes: Array[Byte]): Message = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
index ecaa294..e9c95e3 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,24 @@
 
 package io.gearpump.streaming.kafka.lib
 
+import scala.util.{Failure, Success, Try}
+
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage}
-import io.gearpump._
-import OffsetStorage.{Overflow, StorageEmpty, Underflow}
-import io.gearpump.util.LogUtil
 import org.slf4j.Logger
 
-import scala.util.{Failure, Success, Try}
+import io.gearpump._
+import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
+import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage}
+import io.gearpump.util.LogUtil
 
 object KafkaOffsetManager {
   private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager])
 }
 
 private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager {
-  import KafkaOffsetManager._
+  import io.gearpump.streaming.kafka.lib.KafkaOffsetManager._
 
-  var maxTime: TimeStamp  = 0L
+  var maxTime: TimeStamp = 0L
 
   override def filter(messageAndOffset: (Message, Long)): Option[Message] = {
     val (message, offset) = messageAndOffset

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
index 50905fc..123f3ac 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,13 +20,13 @@ package io.gearpump.streaming.kafka.lib
 
 import java.util.Properties
 
-import io.gearpump.streaming.kafka.KafkaSource
-import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper}
-import io.gearpump.util.LogUtil
 import kafka.api.OffsetRequest
 import kafka.consumer.ConsumerConfig
 import org.slf4j.Logger
 
+import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper}
+import io.gearpump.util.LogUtil
+
 object KafkaSourceConfig {
 
   val NAME = "kafka_config"
@@ -45,12 +45,14 @@ object KafkaSourceConfig {
 }
 
 /**
- * this class extends kafka kafka.consumer.ConsumerConfig with specific configs for [[KafkaSource]]
+ * Extends kafka.consumer.ConsumerConfig with specific config needed by
+ * [[io.gearpump.streaming.kafka.KafkaSource]]
  *
  * @param consumerProps kafka consumer config
  */
-class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends java.io.Serializable  {
-  import KafkaSourceConfig._
+class KafkaSourceConfig(val consumerProps: Properties = new Properties)
+  extends java.io.Serializable {
+  import io.gearpump.streaming.kafka.lib.KafkaSourceConfig._
 
   if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) {
     consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181")
@@ -63,9 +65,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps)
 
   /**
-   * set kafka consumer topics
+   * Set kafka consumer topics, seperated by comma.
+   *
    * @param topics comma-separated string
-   * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.CONSUMER_TOPICS]] set to given value
+   * @return new KafkaConfig based on this but with
+   *         [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]]
+   *         set to given value
    */
   def withConsumerTopics(topics: String): KafkaSourceConfig = {
     consumerProps.setProperty(CONSUMER_TOPICS, topics)
@@ -73,18 +78,22 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   }
 
   /**
-   * @return a list of kafka consumer topics
+   * Returns a list of kafka consumer topics
    */
   def getConsumerTopics: List[String] = {
     Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList
   }
 
   /**
-   * [[consumer.FetchThread]] will sleep for a while if no more messages or
-   * the incoming queue size is above the [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]]
-   * this is to set sleep interval
+   * Sets the sleep interval if there are no more message or message buffer is full.
+   *
+   * Consumer.FetchThread will sleep for a while if no more messages or
+   * the incoming queue size is above the
+   * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
+   *
    * @param sleepMS sleep interval in milliseconds
-   * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_SLEEP_MS]] set to given value
+   * @return new KafkaConfig based on this but with
+   *         [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]] set to given value
    */
   def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = {
     consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS")
@@ -92,9 +101,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   }
 
   /**
-   * [[consumer.FetchThread]] will sleep for a while if no more messages or
-   * the incoming queue size is above the [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]]
-   * this is to get sleep interval
+   * Gets the sleep interval
+   *
+   * Consumer.FetchThread sleeps for a while if no more messages or
+   * the incoming queue is full (size is bigger than the
+   * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]])
+   *
    * @return sleep interval in milliseconds
    */
   def getFetchSleepMS: Int = {
@@ -102,10 +114,14 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   }
 
   /**
-   * [[consumer.FetchThread]] stops fetching new messages if its incoming queue
+   * Sets the batch size we use for one fetch.
+   *
+   * Consumer.FetchThread stops fetching new messages if its incoming queue
    * size is above the threshold and starts again when the queue size is below it
+   *
    * @param threshold queue size
-   * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]] set to give value
+   * @return new KafkaConfig based on this but with
+   *         [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] set to give value
    */
   def withFetchThreshold(threshold: Int): KafkaSourceConfig = {
     consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold")
@@ -113,9 +129,11 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   }
 
   /**
+   * Returns fetch batch size.
+   *
+   * Consumer.FetchThread stops fetching new messages if
+   * its incoming queue size is above the threshold and starts again when the queue size is below it
    *
-   * [[io.gearpump.streaming.kafka.lib.consumer.FetchThread]] stops fetching new messages if its incoming queue
-   * size is above the threshold and starts again when the queue size is below it
    * @return fetch threshold
    */
   def getFetchThreshold: Int = {
@@ -123,11 +141,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   }
 
   /**
-   * set [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which
-   * defines how kafka.common.TopicAndPartitions are mapped to source tasks
+   * Sets [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which
+   * defines how kafka.common.TopicAndPartitions are mapped to source tasks.
    *
    * @param className name of the factory class
-   * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.GROUPER_CLASS]] set to given value
+   * @return new KafkaConfig based on this but with
+   *         [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]] set to given value
    */
   def withGrouper(className: String): KafkaSourceConfig = {
     consumerProps.setProperty(GROUPER_CLASS, className)
@@ -135,13 +154,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   }
 
   /**
-   * get [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which
+   * Returns [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which
    * defines how kafka.common.TopicAndPartitions are mapped to source tasks
-   * @return
    */
   def getGrouper: KafkaGrouper = {
-    Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS)).getOrElse(classOf[KafkaDefaultGrouper].getName))
-        .newInstance().asInstanceOf[KafkaGrouper]
+    Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS))
+      .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper]
   }
 
   def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = {
@@ -150,7 +168,8 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends
   }
 
   def getConsumerStartOffset: Long = {
-    Option(consumerProps.getProperty(CONSUMER_START_OFFSET)).getOrElse(s"${OffsetRequest.EarliestTime}").toLong
+    Option(consumerProps.getProperty(CONSUMER_START_OFFSET))
+      .getOrElse(s"${OffsetRequest.EarliestTime}").toLong
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
index 54a66ae..2f7fcf7 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,11 +27,12 @@ import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
 import kafka.utils.{ZKStringSerializer, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
-import io.gearpump.util.LogUtil
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import org.apache.kafka.common.serialization.Serializer
 import org.slf4j.Logger
 
+import io.gearpump.util.LogUtil
+
 object KafkaUtil {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
@@ -39,7 +40,8 @@ object KafkaUtil {
     val zkClient = connectZk
     try {
       val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-        .getOrElse(throw new RuntimeException(s"leader not available for TopicAndPartition($topic, $partition)"))
+        .getOrElse(throw new RuntimeException(
+          s"leader not available for TopicAndPartition($topic, $partition)"))
       ZkUtils.getBrokerInfo(zkClient, leader)
         .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader"))
     } catch {
@@ -51,12 +53,13 @@ object KafkaUtil {
     }
   }
 
-  def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String]): Array[TopicAndPartition] = {
+  def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String])
+    : Array[TopicAndPartition] = {
     val zkClient = connectZk
     try {
-        ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
-          case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
-        }.toArray
+      ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
+        case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
+      }.toArray
     } catch {
       case e: Exception =>
         LOG.error(e.getMessage)
@@ -80,10 +83,11 @@ object KafkaUtil {
   }
 
   /**
-   *  create a new kafka topic
-   *  return true if topic already exists, and false otherwise
+   * create a new kafka topic
+   * return true if topic already exists, and false otherwise
    */
-  def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int): Boolean = {
+  def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int)
+    : Boolean = {
     val zkClient = connectZk
     try {
       if (AdminUtils.topicExists(zkClient, topic)) {
@@ -132,15 +136,16 @@ object KafkaUtil {
       case e: Exception =>
         LOG.error(s"$filename not found")
     } finally {
-      if(propStream != null)
+      if (propStream != null) {
         propStream.close()
+      }
     }
     props
   }
 
   def createKafkaProducer[K, V](properties: Properties,
-                                keySerializer: Serializer[K],
-                                valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
+      keySerializer: Serializer[K],
+      valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
     if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
       properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
     }
@@ -153,11 +158,10 @@ object KafkaUtil {
     properties
   }
 
- def buildConsumerConfig(zkConnect: String): Properties = {
-   val properties = new Properties()
-   properties.setProperty("zookeeper.connect", zkConnect)
-   properties.setProperty("group.id", "gearpump")
-   properties
+  def buildConsumerConfig(zkConnect: String): Properties = {
+    val properties = new Properties()
+    properties.setProperty("zookeeper.connect", zkConnect)
+    properties.setProperty("group.id", "gearpump")
+    properties
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
index b821f15..141ae98 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,15 +7,13 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package io.gearpump.streaming.kafka.lib.consumer

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
index ee53151..b2b3f4f 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,17 +23,18 @@ import java.util.concurrent.LinkedBlockingQueue
 
 import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
-import io.gearpump.util.LogUtil
 import org.slf4j.Logger
 
+import io.gearpump.util.LogUtil
+
 object FetchThread {
   private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
 
   def apply(topicAndPartitions: Array[TopicAndPartition],
-            fetchThreshold: Int,
-            fetchSleepMS: Long,
-            startOffsetTime: Long,
-            consumerConfig: ConsumerConfig): FetchThread = {
+      fetchThreshold: Int,
+      fetchSleepMS: Long,
+      startOffsetTime: Long,
+      consumerConfig: ConsumerConfig): FetchThread = {
     val createConsumer = (tp: TopicAndPartition) =>
       KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
 
@@ -43,20 +44,22 @@ object FetchThread {
 }
 
 /**
- * A thread to fetch messages from multiple kafka [[TopicAndPartition]]s and puts them
+ * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them
  * onto a queue, which is asynchronously polled by a consumer
  *
- * @param createConsumer given a [[TopicAndPartition]], create a [[KafkaConsumer]] to connect to it
+ * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
+ *                       [[io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to
+ *                       connect to it
  * @param incomingQueue a queue to buffer incoming messages
  * @param fetchThreshold above which thread should stop fetching messages
  * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold
  */
 private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
-                                 createConsumer: TopicAndPartition => KafkaConsumer,
-                                 incomingQueue: LinkedBlockingQueue[KafkaMessage],
-                                 fetchThreshold: Int,
-                                 fetchSleepMS: Long) extends Thread {
-  import FetchThread._
+    createConsumer: TopicAndPartition => KafkaConsumer,
+    incomingQueue: LinkedBlockingQueue[KafkaMessage],
+    fetchThreshold: Int,
+    fetchSleepMS: Long) extends Thread {
+  import io.gearpump.streaming.kafka.lib.consumer.FetchThread._
 
   private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
index 208a99d..77321b9 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,6 @@
 
 package io.gearpump.streaming.kafka.lib.consumer
 
-import io.gearpump.streaming.kafka.lib.KafkaUtil
 import kafka.api.{FetchRequestBuilder, OffsetRequest}
 import kafka.common.ErrorMapping._
 import kafka.common.TopicAndPartition
@@ -26,8 +25,11 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.message.MessageAndOffset
 import kafka.utils.Utils
 
+import io.gearpump.streaming.kafka.lib.KafkaUtil
+
 object KafkaConsumer {
-  def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig): KafkaConsumer = {
+  def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig)
+    : KafkaConsumer = {
     val connectZk = KafkaUtil.connectZookeeper(config)
     val broker = KafkaUtil.getBroker(connectZk(), topic, partition)
     val soTimeout = config.socketTimeoutMs
@@ -40,7 +42,7 @@ object KafkaConsumer {
         .addFetch(topic, partition, offset, fetchSize)
         .build()
 
-    val response = consumer.fetch(request)
+      val response = consumer.fetch(request)
       response.errorCode(topic, partition) match {
         case NoError => response.messageSet(topic, partition).iterator
         case error => throw exceptionFor(error)
@@ -51,13 +53,14 @@ object KafkaConsumer {
 }
 
 /**
- * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over messages from a kafka kafka.common.TopicAndPartition.
+ * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over
+ * messages from a kafka kafka.common.TopicAndPartition.
  */
 class KafkaConsumer(consumer: SimpleConsumer,
-                    topic: String,
-                    partition: Int,
-                    getIterator: (Long) => Iterator[MessageAndOffset],
-                    startOffsetTime: Long = OffsetRequest.EarliestTime) {
+    topic: String,
+    partition: Int,
+    getIterator: (Long) => Iterator[MessageAndOffset],
+    startOffsetTime: Long = OffsetRequest.EarliestTime) {
   private val earliestOffset = consumer
     .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1)
   private var nextOffset: Long = earliestOffset
@@ -97,6 +100,4 @@ class KafkaConsumer(consumer: SimpleConsumer,
   def close(): Unit = {
     consumer.close()
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
index 18d1bf0..16330ed 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -28,9 +28,11 @@ import kafka.common.TopicAndPartition
  * @param msg message payload
  */
 case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long,
-                        key: Option[Array[Byte]], msg: Array[Byte]) {
+    key: Option[Array[Byte]], msg: Array[Byte]) {
+
   def this(topic: String, partition: Int, offset: Long,
-    key: Option[Array[Byte]], msg: Array[Byte]) =
+      key: Option[Array[Byte]], msg: Array[Byte]) = {
     this(TopicAndPartition(topic, partition), offset, key, msg)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
index 4a384fc..0f968e2 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,7 +30,9 @@ import kafka.common.TopicAndPartition
  * streamProducer1 gets (topicA, partition2), (topicB, partition2)
  */
 class KafkaDefaultGrouper extends KafkaGrouper {
-  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition] = {
-    topicAndPartitions.indices.filter(_ % taskNum == taskIndex).map(i => topicAndPartitions(i)).toArray
+  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+    : Array[TopicAndPartition] = {
+    topicAndPartitions.indices.filter(_ % taskNum == taskIndex)
+      .map(i => topicAndPartitions(i)).toArray
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
index 035917b..6660a04 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,7 @@ import kafka.common.TopicAndPartition
  * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks
  */
 trait KafkaGrouper {
-  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition]
+  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+    : Array[TopicAndPartition]
 }
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
index 86bc099..2b00414 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,8 +19,6 @@
 package io.gearpump.streaming.kafka
 
 import com.twitter.bijection.Injection
-import io.gearpump.Message
-import io.gearpump.streaming.MockUtil
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -28,6 +26,9 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
+import io.gearpump.Message
+import io.gearpump.streaming.MockUtil
+
 class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
   val dataGen = for {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
index e67b572..6cd78fc 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,10 @@
 
 package io.gearpump.streaming.kafka
 
+import scala.util.{Failure, Success}
+
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.kafka.lib.{KafkaSourceConfig, KafkaOffsetManager}
-import io.gearpump.streaming.kafka.lib.consumer.{KafkaMessage, FetchThread}
-import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, TimeStampFilter, MessageDecoder, OffsetStorage}
 import kafka.common.TopicAndPartition
-import io.gearpump.Message
-import io.gearpump.streaming.kafka.lib.consumer.FetchThread
-import io.gearpump.streaming.kafka.lib.KafkaOffsetManager
-import OffsetStorage.StorageEmpty
-import io.gearpump.streaming.transaction.api.OffsetStorageFactory
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -35,7 +29,11 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.util.{Failure, Success}
+import io.gearpump.Message
+import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
+import io.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig}
+import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
+import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter}
 
 class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
@@ -50,8 +48,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
     val timestampFilter = mock[TimeStampFilter]
     val offsetStorageFactory = mock[OffsetStorageFactory]
     val kafkaConfig = mock[KafkaSourceConfig]
-    val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter,
-      Some(fetchThread), Map(topicAndPartition -> offsetManager))
+    val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+      timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
 
     kafkaSource.setStartTime(None)
 
@@ -68,8 +66,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
       val timestampFilter = mock[TimeStampFilter]
       val offsetStorageFactory = mock[OffsetStorageFactory]
       val kafkaConfig = mock[KafkaSourceConfig]
-      val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter,
-        Some(fetchThread), Map(topicAndPartition -> offsetManager))
+      val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+        timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
 
       when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty))
 
@@ -95,8 +93,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
         val timestampFilter = mock[TimeStampFilter]
         val offsetStorageFactory = mock[OffsetStorageFactory]
         val kafkaConfig = mock[KafkaSourceConfig]
-        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter,
-          Some(fetchThread), Map(topicAndPartition -> offsetManager))
+        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+          timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
 
         when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset))
 
@@ -135,7 +133,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
         val kafkaConfig = mock[KafkaSourceConfig]
         val offsetManagers = kafkaMsgList.map(_.topicAndPartition -> offsetManager).toMap
 
-        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, Some(fetchThread), offsetManagers)
+        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+          timestampFilter, Some(fetchThread), offsetManagers)
 
         if (number == 0) {
           verify(fetchThread, never()).poll
@@ -171,10 +170,9 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
     val messageDecoder = mock[MessageDecoder]
     val offsetStorageFactory = mock[OffsetStorageFactory]
     val kafkaConfig = mock[KafkaSourceConfig]
-    val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, Some(fetchThread),
-      Map(topicAndPartition -> offsetManager))
+    val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder,
+      timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager))
     source.close()
     verify(offsetManager).close()
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
index 2453adf..f3b5425 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,8 +20,8 @@ package io.gearpump.streaming.kafka.lib
 
 import com.twitter.bijection.Injection
 import org.scalacheck.Gen
-import org.scalatest.{PropSpec, Matchers}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
 
 class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
   property("DefaultMessageDecoder should keep the original bytes data in Message") {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
index e185a29..c762b06 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,18 @@
 
 package io.gearpump.streaming.kafka.lib
 
+import scala.util.{Failure, Success}
+
 import com.twitter.bijection.Injection
-import io.gearpump.streaming.transaction.api.OffsetStorage
-import io.gearpump.Message
-import OffsetStorage.{Overflow, StorageEmpty, Underflow}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.util.{Failure, Success}
+import io.gearpump.Message
+import io.gearpump.streaming.transaction.api.OffsetStorage
+import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow}
 
 class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
@@ -40,11 +41,12 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers
 
   val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex)
 
-  property("KafkaOffsetManager should append offset to storage in monotonically increasing time order") {
+  property("KafkaOffsetManager should append offset to storage in monotonically" +
+    " increasing time order") {
     forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) =>
       val offsetStorage = mock[OffsetStorage]
       val offsetManager = new KafkaOffsetManager(offsetStorage)
-      messageAndOffsets.foldLeft(0L){ (max, messageAndOffset) =>
+      messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) =>
         val (message, offset) = messageAndOffset
         offsetManager.filter((message, offset.toLong)) shouldBe Option(message)
         if (message.timestamp > max) {
@@ -62,7 +64,8 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers
 
   val minTimeStampGen = Gen.choose[Long](0L, 500L)
   val maxTimeStampGen = Gen.choose[Long](500L, 1000L)
-  property("KafkaOffsetManager resolveOffset should report StorageEmpty failure when storage is empty") {
+  property("KafkaOffsetManager resolveOffset should " +
+    "report StorageEmpty failure when storage is empty") {
     forAll(timeStampGen) { (time: Long) =>
       val offsetStorage = mock[OffsetStorage]
       val offsetManager = new KafkaOffsetManager(offsetStorage)
@@ -78,16 +81,19 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers
   }
 
   val offsetGen = Gen.choose[Long](0L, 1000L)
-  property("KafkaOffsetManager resolveOffset should return a valid offset when storage is not empty") {
+  property("KafkaOffsetManager resolveOffset should return a valid" +
+    " offset when storage is not empty") {
     forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) {
       (time: Long, min: Long, max: Long, offset: Long) =>
         val offsetStorage = mock[OffsetStorage]
         val offsetManager = new KafkaOffsetManager(offsetStorage)
         if (time < min) {
-          when(offsetStorage.lookUp(time)).thenReturn(Failure(Underflow(Injection[Long, Array[Byte]](min))))
+          when(offsetStorage.lookUp(time)).thenReturn(Failure(
+            Underflow(Injection[Long, Array[Byte]](min))))
           offsetManager.resolveOffset(time) shouldBe Success(min)
         } else if (time > max) {
-          when(offsetStorage.lookUp(time)).thenReturn(Failure(Overflow(Injection[Long, Array[Byte]](max))))
+          when(offsetStorage.lookUp(time)).thenReturn(Failure(
+            Overflow(Injection[Long, Array[Byte]](max))))
           offsetManager.resolveOffset(time) shouldBe Success(max)
         } else {
           when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset)))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
index bcc757b..af23c12 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,163 +18,170 @@
 
 package io.gearpump.streaming.kafka.lib
 
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-  val minTimeGen = Gen.choose[Long](1L, 500L)
-  val maxTimeGen = Gen.choose[Long](500L, 999L)
-
- /* property("KafkaStorage lookup time should report StorageEmpty if storage is empty") {
-    forAll { (time: Long, topic: String) =>
-      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-      val getConsumer = () => mock[KafkaConsumer]
-      val connectZk = () => mock[ZkClient]
-      val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), connectZk())
-      storage.lookUp(time) shouldBe Failure(StorageEmpty)
-    }
-  }
-
-  property("KafkaStorage lookup time should return data or report failure if storage is not empty") {
-    forAll(minTimeGen, maxTimeGen, Gen.alphaStr) { (minTime: Long, maxTime: Long, topic: String) =>
-      val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) =>
-        val offset = index.toLong
-        time -> offset
-      }
-      val timeAndOffsetsMap = timeAndOffsets.toMap
-      val data = timeAndOffsets.map {
-        case (time, offset) =>
-          new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)),
-            Injection[Long, Array[Byte]](offset))
-      }.toList
-
-      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-      val consumer = mock[KafkaConsumer]
-      val getConsumer = () => consumer
-      val connectZk = () => mock[ZkClient]
-
-      val hasNexts = List.fill(data.tail.size)(true) :+ false
-      when(consumer.hasNext).thenReturn(true, hasNexts:_*)
-      when(consumer.next).thenReturn(data.head, data.tail:_*)
-
-      val storage = new KafkaStorage(topic, topicExists = true, producer, getConsumer(), connectZk())
-      forAll(Gen.choose[Long](minTime, maxTime)) {
-        time =>
-          storage.lookUp(time) match {
-            case Success(array) => array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time)))
-            case Failure(e)     => fail("time in range should return Success with value")
-          }
-      }
-
-      forAll(Gen.choose[Long](0L, minTime - 1)) {
-        time =>
-          storage.lookUp(time) match {
-            case Failure(e) => e shouldBe a [Underflow]
-              e.asInstanceOf[Underflow].min should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime)))
-            case Success(_) => fail("time less than min should return Underflow failure")
-          }
-      }
-
-      forAll(Gen.choose[Long](maxTime + 1, 1000L)) {
-        time =>
-          storage.lookUp(time) match {
-            case Failure(e) => e shouldBe a [Overflow]
-              e.asInstanceOf[Overflow].max should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime)))
-            case Success(_) => fail("time larger than max should return Overflow failure")
-          }
-      }
-    }
-  }
-
-  property("KafkaStorage append should send data to Kafka") {
-    forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000), Gen.alphaStr, Gen.oneOf(true, false)) {
-      (time: Long, offset: Long, topic: String, topicExists: Boolean) =>
-      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-      val getConsumer = () => mock[KafkaConsumer]
-      val connectZk = () => mock[ZkClient]
-      val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk())
-      val offsetBytes = Injection[Long, Array[Byte]](offset)
-      storage.append(time, offsetBytes)
-      verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]())
-    }
-  }
-
-  val topicAndPartitionGen = for {
-    topic <- Gen.alphaStr
-    partition <- Gen.choose[Int](0, 100)
-  } yield TopicAndPartition(topic, partition)
-  property("KafkaStorage should load data from Kafka") {
-    val kafkaMsgGen = for {
-      timestamp <- Gen.choose[Long](1L, 1000L)
-      offset    <- Gen.choose[Long](0L, 1000L)
-    } yield (timestamp, Injection[Long, Array[Byte]](offset))
-    val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen)
-
-    val topicExistsGen = Gen.oneOf(true, false)
-
-    forAll(topicAndPartitionGen, msgListGen) {
-      (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) =>
-        val producer=  mock[KafkaProducer[Array[Byte], Array[Byte]]]
-        val consumer = mock[KafkaConsumer]
-        val getConsumer = () => consumer
-        val connectZk = () => mock[ZkClient]
-        val kafkaStorage = new KafkaStorage(topicAndPartition.topic, topicExists = true, producer, getConsumer(), connectZk())
-          msgList match {
-            case Nil =>
-              when(consumer.hasNext).thenReturn(false)
-            case list =>
-              val hasNexts = List.fill(list.tail.size)(true) :+ false
-              val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) =>
-                KafkaMessage(topicAndPartition, index.toLong, Some(Injection[Long, Array[Byte]](timestamp)), bytes)
-              }
-              when(consumer.hasNext).thenReturn(true, hasNexts: _*)
-              when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*)
-          }
-          kafkaStorage.load(consumer) shouldBe msgList
-    }
-  }
-
-  property("KafkaStorage should not get consumer when topic doesn't exist") {
-    forAll(Gen.alphaStr) { (topic: String) =>
-      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-      val getConsumer = mock[() => KafkaConsumer]
-      val connectZk = () => mock[ZkClient]
-      val kafkaStorage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), connectZk())
-      verify(getConsumer, never()).apply()
-      kafkaStorage.close()
-    }
-  }
-
-  property("KafkaStorage should fail to load invalid KafkaMessage") {
-    val invalidKafkaMsgGen = for {
-      tp <- topicAndPartitionGen
-      offset <- Gen.choose[Long](1L, 1000L)
-      timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))), None)
-      msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
-    } yield KafkaMessage(tp, offset, timestamp, msg)
-    forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) =>
-      val consumer = mock[KafkaConsumer]
-      val getConsumer = () => consumer
-      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-      val connectZk = () => mock[ZkClient]
-      val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic, topicExists = true,
-        producer, getConsumer(), connectZk())
-      when(consumer.hasNext).thenReturn(true, false)
-      when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg)
-      Try(kafkaStorage.load(consumer)).isFailure shouldBe true
-    }
-  }
-
-  property("KafkaStorage close should close kafka producer and delete topic") {
-    val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-    val getConsumer = () => mock[KafkaConsumer]
-    val zkClient = mock[ZkClient]
-    val connectZk = () => zkClient
-    val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk())
-    kafkaStorage.close()
-    verify(producer).close()
-    verify(zkClient).createPersistent(anyString(), anyString())
-  }*/
-}
+// TODO: Fix the UT failure!
+
+// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+//  val minTimeGen = Gen.choose[Long](1L, 500L)
+//  val maxTimeGen = Gen.choose[Long](500L, 999L)
+//
+//  property("KafkaStorage lookup time should report StorageEmpty if storage is empty") {
+//    forAll { (time: Long, topic: String) =>
+//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+//      val getConsumer = () => mock[KafkaConsumer]
+//      val connectZk = () => mock[ZkClient]
+//      val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(),
+//        connectZk())
+//      storage.lookUp(time) shouldBe Failure(StorageEmpty)
+//    }
+//  }
+//
+//  property("KafkaStorage lookup time should return data or report failure if storage not empty") {
+//    forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) =>
+//      val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) =>
+//        val offset = index.toLong
+//        time -> offset
+//      }
+//      val timeAndOffsetsMap = timeAndOffsets.toMap
+//      val data = timeAndOffsets.map {
+//        case (time, offset) =>
+//          new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)),
+//            Injection[Long, Array[Byte]](offset))
+//      }.toList
+//
+//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+//      val consumer = mock[KafkaConsumer]
+//      val getConsumer = () => consumer
+//      val connectZk = () => mock[ZkClient]
+//
+//      val hasNexts = List.fill(data.tail.size)(true) :+ false
+//      when(consumer.hasNext).thenReturn(true, hasNexts:_*)
+//      when(consumer.next).thenReturn(data.head, data.tail:_*)
+//
+//      val storage = new KafkaStorage(topic, topicExists = true, producer,
+//        getConsumer(), connectZk())
+//      forAll(Gen.choose[Long](minTime, maxTime)) {
+//        time =>
+//          storage.lookUp(time) match {
+//            case Success(array) =>
+//              array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time)))
+//            case Failure(e)     => fail("time in range should return Success with value")
+//          }
+//      }
+//
+//      forAll(Gen.choose[Long](0L, minTime - 1)) {
+//        time =>
+//          storage.lookUp(time) match {
+//            case Failure(e) => e shouldBe a [Underflow]
+//              e.asInstanceOf[Underflow].min should equal
+//                (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime)))
+//            case Success(_) => fail("time less than min should return Underflow failure")
+//          }
+//      }
+//
+//      forAll(Gen.choose[Long](maxTime + 1, 1000L)) {
+//        time =>
+//          storage.lookUp(time) match {
+//            case Failure(e) => e shouldBe a [Overflow]
+//              e.asInstanceOf[Overflow].max should equal
+//                (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime)))
+//            case Success(_) => fail("time larger than max should return Overflow failure")
+//          }
+//      }
+//    }
+//  }
+//
+//  property("KafkaStorage append should send data to Kafka") {
+//    forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000),
+//      Gen.alphaStr, Gen.oneOf(true, false)) {
+//      (time: Long, offset: Long, topic: String, topicExists: Boolean) =>
+//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+//      val getConsumer = () => mock[KafkaConsumer]
+//      val connectZk = () => mock[ZkClient]
+//      val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk())
+//      val offsetBytes = Injection[Long, Array[Byte]](offset)
+//      storage.append(time, offsetBytes)
+//      verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]())
+//    }
+//  }
+//
+//  val topicAndPartitionGen = for {
+//    topic <- Gen.alphaStr
+//    partition <- Gen.choose[Int](0, 100)
+//  } yield TopicAndPartition(topic, partition)
+//  property("KafkaStorage should load data from Kafka") {
+//    val kafkaMsgGen = for {
+//      timestamp <- Gen.choose[Long](1L, 1000L)
+//      offset    <- Gen.choose[Long](0L, 1000L)
+//    } yield (timestamp, Injection[Long, Array[Byte]](offset))
+//    val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen)
+//
+//    val topicExistsGen = Gen.oneOf(true, false)
+//
+//    forAll(topicAndPartitionGen, msgListGen) {
+//      (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) =>
+//        val producer=  mock[KafkaProducer[Array[Byte], Array[Byte]]]
+//        val consumer = mock[KafkaConsumer]
+//        val getConsumer = () => consumer
+//        val connectZk = () => mock[ZkClient]
+//        val kafkaStorage = new KafkaStorage(topicAndPartition.topic,
+//          topicExists = true, producer, getConsumer(), connectZk())
+//          msgList match {
+//            case Nil =>
+//              when(consumer.hasNext).thenReturn(false)
+//            case list =>
+//              val hasNexts = List.fill(list.tail.size)(true) :+ false
+//              val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) =>
+//                KafkaMessage(topicAndPartition, index.toLong,
+//                  Some(Injection[Long, Array[Byte]](timestamp)), bytes)
+//              }
+//              when(consumer.hasNext).thenReturn(true, hasNexts: _*)
+//              when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*)
+//          }
+//          kafkaStorage.load(consumer) shouldBe msgList
+//    }
+//  }
+//
+//  property("KafkaStorage should not get consumer when topic doesn't exist") {
+//    forAll(Gen.alphaStr) { (topic: String) =>
+//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+//      val getConsumer = mock[() => KafkaConsumer]
+//      val connectZk = () => mock[ZkClient]
+//      val kafkaStorage = new KafkaStorage(topic,
+//        topicExists = false, producer, getConsumer(), connectZk())
+//      verify(getConsumer, never()).apply()
+//      kafkaStorage.close()
+//    }
+//  }
+//
+//  property("KafkaStorage should fail to load invalid KafkaMessage") {
+//    val invalidKafkaMsgGen = for {
+//      tp <- topicAndPartitionGen
+//      offset <- Gen.choose[Long](1L, 1000L)
+//      timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))),
+//        None)
+//      msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
+//    } yield KafkaMessage(tp, offset, timestamp, msg)
+//    forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) =>
+//      val consumer = mock[KafkaConsumer]
+//      val getConsumer = () => consumer
+//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+//      val connectZk = () => mock[ZkClient]
+//      val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic,
+//        topicExists = true, producer, getConsumer(), connectZk())
+//      when(consumer.hasNext).thenReturn(true, false)
+//      when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg)
+//      Try(kafkaStorage.load(consumer)).isFailure shouldBe true
+//    }
+//  }
+//
+//  property("KafkaStorage close should close kafka producer and delete topic") {
+//    val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+//    val getConsumer = () => mock[KafkaConsumer]
+//    val zkClient = mock[ZkClient]
+//    val connectZk = () => zkClient
+//    val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk())
+//    kafkaStorage.close()
+//    verify(producer).close()
+//    verify(zkClient).createPersistent(anyString(), anyString())
+//  }
+// }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
index 24c1c5d..7447308 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,34 +18,36 @@
 
 package io.gearpump.streaming.kafka.lib
 
-import io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper
-import io.gearpump.streaming.kafka.util.KafkaServerHarness
 import kafka.common.TopicAndPartition
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.server.{KafkaConfig => KafkaServerConfig}
+import kafka.utils.{TestUtils, TestZKUtils}
 import org.scalatest.prop.PropertyChecks
-import org.scalatest.{PropSpec, Matchers, BeforeAndAfterEach}
+import org.scalatest.{BeforeAndAfterEach, Matchers, PropSpec}
 
-import kafka.server.{KafkaConfig => KafkaServerConfig}
+import io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper
+import io.gearpump.streaming.kafka.util.KafkaServerHarness
 
+class KafkaUtilSpec
+  extends PropSpec with PropertyChecks with BeforeAndAfterEach
+  with Matchers with KafkaServerHarness {
 
-class KafkaUtilSpec extends PropSpec with PropertyChecks with BeforeAndAfterEach with Matchers with KafkaServerHarness {
   val numServers = 1
   override val configs: List[KafkaServerConfig] =
-    for(props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false))
-    yield new KafkaServerConfig(props) {
-      override val zkConnect = TestZKUtils.zookeeperConnect
-      override val numPartitions = 4
-    }
+    for (props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false))
+      yield new KafkaServerConfig(props) {
+        override val zkConnect = TestZKUtils.zookeeperConnect
+        override val numPartitions = 4
+      }
 
-  override def beforeEach: Unit = {
+  override def beforeEach(): Unit = {
     super.setUp()
   }
 
-  override def afterEach: Unit = {
+  override def afterEach(): Unit = {
     super.tearDown()
   }
 
-  import KafkaUtil._
+  import io.gearpump.streaming.kafka.lib.KafkaUtil._
 
   property("KafkaUtil should be able to create topic") {
     val args = {
@@ -91,13 +93,15 @@ class KafkaUtilSpec extends PropSpec with PropertyChecks with BeforeAndAfterEach
 
   property("KafkaUtil should be able to get TopicAndPartitions info and group with KafkaGrouper") {
     val grouper: KafkaGrouper = new KafkaGrouper {
-      override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition] =
+      override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition])
+        : Array[TopicAndPartition] = {
         topicAndPartitions
+      }
     }
     val topicNum = 3
     val topics = List.fill(topicNum)(TestUtils.tempTopic())
     topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, replicas = 1))
-    KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe topics.map(t => TopicAndPartition(t, 0)).toSet
+    KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe
+      topics.map(t => TopicAndPartition(t, 0)).toSet
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
index a20e575..3983874 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,20 +7,18 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package io.gearpump.streaming.kafka.lib.consumer
 
-import org.scalatest.{WordSpec, Matchers}
+import org.scalatest.{Matchers, WordSpec}
 
 class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
 
@@ -69,4 +66,3 @@ class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
   }
 }
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
index 92d3c31..d955dfa 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -35,14 +35,14 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo
   property("FetchThread should set startOffset to iterators") {
     forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
       (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
-      val topicAndPartition = mock[TopicAndPartition]
-      val consumer = mock[KafkaConsumer]
-      val createConsumer =  (tp: TopicAndPartition) => consumer
-      val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-      val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
-        incomingQueue, fetchThreshold, fetchSleepMS)
-      fetchThread.setStartOffset(topicAndPartition, startOffset)
-      verify(consumer).setStartOffset(startOffset)
+        val topicAndPartition = mock[TopicAndPartition]
+        val consumer = mock[KafkaConsumer]
+        val createConsumer = (tp: TopicAndPartition) => consumer
+        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+        val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer,
+          incomingQueue, fetchThreshold, fetchSleepMS)
+        fetchThread.setStartOffset(topicAndPartition, startOffset)
+        verify(consumer).setStartOffset(startOffset)
     }
   }
 
@@ -50,10 +50,11 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo
     topic <- Gen.alphaStr
     partition <- Gen.choose[Int](0, Int.MaxValue)
   } yield TopicAndPartition(topic, partition)
-  property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") {
+  property("FetchThread should only fetchMessage when the number " +
+    "of messages in queue is below the threshold") {
     forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) {
       (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
-       startOffset: Long, topicAndPartition: TopicAndPartition) =>
+        startOffset: Long, topicAndPartition: TopicAndPartition) =>
         val message = mock[KafkaMessage]
         val consumer = mock[KafkaConsumer]
         val createConsumer = (tp: TopicAndPartition) => consumer
@@ -87,8 +88,12 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo
     tp <- topicAndPartitionGen
     hasNext <- Gen.oneOf(true, false)
   } yield (tp, hasNext)
-  val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen).map(_.toMap) suchThat (_.nonEmpty)
-  property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") {
+
+  val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen)
+    .map(_.toMap) suchThat (_.nonEmpty)
+
+  property("FetchThread fetchMessage should return false when there are no more messages " +
+    "from any TopicAndPartition") {
     forAll(tpHasNextMapGen, nonNegativeGen) {
       (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) =>
         val createConsumer = (tp: TopicAndPartition) => {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
index 2669f60..524fc9b 100644
--- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
+++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,