You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/05/18 06:03:51 UTC

[2/2] incubator-gearpump git commit: [GEARPUMP-312] Add Message trait and DefaultMessage impl

[GEARPUMP-312] Add Message trait and DefaultMessage impl

Author: manuzhang <ow...@gmail.com>

Closes #183 from manuzhang/message_trait.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/176d8276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/176d8276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/176d8276

Branch: refs/heads/master
Commit: 176d82763550ff59b65038c264c63d6b11951996
Parents: 000e846
Author: manuzhang <ow...@gmail.com>
Authored: Thu May 18 14:02:51 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu May 18 14:03:34 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/gearpump/Message.scala     | 47 ++++++++----
 .../streaming/examples/complexdag/Node.scala    |  4 +-
 .../streaming/examples/complexdag/Sink.scala    |  2 +-
 .../streaming/examples/complexdag/Source.scala  |  2 +-
 .../examples/complexdag/NodeSpec.scala          |  2 +-
 .../examples/complexdag/SourceSpec.scala        |  3 +-
 .../examples/fsio/SeqFileStreamProcessor.scala  |  2 +-
 .../fsio/SeqFileStreamProducerSpec.scala        |  2 +-
 .../examples/kafka/wordcount/Split.scala        |  4 +-
 .../examples/kafka/wordcount/Sum.scala          |  4 +-
 .../examples/sol/SOLStreamProcessor.scala       |  3 +-
 .../examples/sol/SOLStreamProducer.scala        |  2 +-
 .../state/processor/CountProcessor.scala        |  3 +-
 .../processor/WindowAverageProcessor.scala      |  4 +-
 .../state/DefaultMessageCountAppSpec.scala      | 79 ++++++++++++++++++++
 .../examples/state/MessageCountAppSpec.scala    | 79 --------------------
 .../state/processor/CountProcessorSpec.scala    |  1 -
 .../streaming/examples/wordcountjava/Split.java |  3 +-
 .../streaming/examples/wordcountjava/Sum.java   |  4 +-
 .../examples/wordcountjava/dsl/WordCount.java   |  3 +-
 .../streaming/examples/wordcount/Sum.scala      |  7 +-
 .../gearpump/akkastream/task/BatchTask.scala    |  9 +--
 .../gearpump/akkastream/task/ConcatTask.scala   |  2 +-
 .../akkastream/task/DelayInitialTask.scala      |  5 +-
 .../akkastream/task/DropWithinTask.scala        |  4 +-
 .../akkastream/task/FlattenMergeTask.scala      |  2 +-
 .../gearpump/akkastream/task/FoldTask.scala     |  6 +-
 .../akkastream/task/GroupedWithinTask.scala     |  2 +-
 .../akkastream/task/InterleaveTask.scala        |  2 +-
 .../gearpump/akkastream/task/MapAsyncTask.scala |  6 +-
 .../gearpump/akkastream/task/MergeTask.scala    |  2 +-
 .../akkastream/task/SingleSourceTask.scala      |  2 +-
 .../akkastream/task/SinkBridgeTask.scala        |  4 +-
 .../akkastream/task/SourceBridgeTask.scala      |  2 +-
 .../akkastream/task/StatefulMapConcatTask.scala |  4 +-
 .../akkastream/task/TakeWithinTask.scala        |  4 +-
 .../gearpump/akkastream/task/ThrottleTask.scala |  5 +-
 .../akkastream/task/TickSourceTask.scala        |  4 +-
 .../gearpump/akkastream/task/Unzip2Task.scala   |  6 +-
 .../gearpump/akkastream/task/Zip2Task.scala     |  4 +-
 .../experimental/rabbitmq/RMQSink.scala         |  2 +-
 .../org/apache/gearpump/redis/RedisSink.scala   |  2 +-
 .../storm/partitioner/StormPartitioner.scala    |  2 +-
 .../storm/processor/StormProcessor.scala        |  2 +-
 .../storm/topology/GearpumpStormComponent.scala |  4 +-
 .../partitioner/StormPartitionerSpec.scala      |  3 +-
 .../storm/util/StormOutputCollectorSpec.scala   | 13 ++--
 .../streaming/hadoop/SequenceFileSink.scala     |  1 -
 .../lib/format/DefaultSequenceFormatter.scala   |  3 +-
 .../hadoop/lib/format/OutputFormatter.scala     |  1 -
 .../gearpump/external/hbase/HBaseSink.scala     |  2 +-
 .../kafka/lib/sink/AbstractKafkaSink.scala      |  2 +-
 .../kafka/lib/source/AbstractKafkaSource.scala  |  3 +-
 .../source/DefaultKafkaMessageDecoderSpec.scala |  4 +-
 .../dsl/partitioner/GroupByPartitioner.scala    |  2 +-
 .../streaming/dsl/scalaapi/Stream.scala         |  2 +-
 .../streaming/dsl/task/TransformTask.scala      | 23 +++---
 .../streaming/dsl/window/impl/Window.scala      |  2 +-
 .../dsl/window/impl/WindowRunner.scala          |  2 +-
 .../partitioner/BroadcastPartitioner.scala      |  4 +-
 .../streaming/partitioner/HashPartitioner.scala |  2 +-
 .../gearpump/streaming/source/Watermark.scala   |  3 +-
 .../streaming/state/api/PersistentTask.scala    |  4 +-
 .../streaming/task/ExpressTransport.scala       |  5 +-
 .../gearpump/streaming/task/Subscription.scala  | 11 +--
 .../apache/gearpump/streaming/task/Task.scala   |  1 -
 .../gearpump/streaming/task/TaskActor.scala     |  2 +-
 .../gearpump/streaming/task/TaskWrapper.scala   |  2 +-
 .../streaming/appmaster/TaskManagerSpec.scala   |  2 +-
 .../dsl/plan/functions/FunctionRunnerSpec.scala |  6 +-
 .../streaming/dsl/scalaapi/StreamSpec.scala     |  4 +-
 .../streaming/dsl/task/TransformTaskSpec.scala  |  4 +-
 .../streaming/task/SubscriptionSpec.scala       |  6 +-
 73 files changed, 233 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala
index 9965565..4dc5c09 100644
--- a/core/src/main/scala/org/apache/gearpump/Message.scala
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -20,34 +20,41 @@ package org.apache.gearpump
 
 import java.time.Instant
 
+trait Message {
+
+  val value: Any
+
+  val timestamp: Instant
+}
+
 /**
  * Each message contains an immutable timestamp.
  *
  * For example, if you take a picture, the time you take the picture is the
  * message's timestamp.
  *
- * @param msg Accept any type except Null, Nothing and Unit
+ * @param value Accept any type except Null, Nothing and Unit
  */
-case class Message(msg: Any, timeInMillis: TimeStamp) {
+case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message {
 
   /**
-   * @param msg Accept any type except Null, Nothing and Unit
+   * @param value Accept any type except Null, Nothing and Unit
    * @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue)
    */
-  def this(msg: Any, timestamp: Instant) = {
-    this(msg, timestamp.toEpochMilli)
+  def this(value: Any, timestamp: Instant) = {
+    this(value, timestamp.toEpochMilli)
   }
 
   /**
    * Instant.EPOCH is used for default timestamp
    *
-   * @param msg Accept any type except Null, Nothing and Uni
+   * @param value Accept any type except Null, Nothing and Uni
    */
-  def this(msg: Any) = {
-    this(msg, Instant.EPOCH)
+  def this(value: Any) = {
+    this(value, Instant.EPOCH)
   }
 
-  def timestamp: Instant = {
+  override val timestamp: Instant = {
     Instant.ofEpochMilli(timeInMillis)
   }
 }
@@ -57,17 +64,25 @@ object Message {
   /**
    * Instant.EPOCH is used for default timestamp
    *
-   * @param msg Accept any type except Null, Nothing and Uni
+   * @param value Accept any type except Null, Nothing and Unit
    */
-  def apply(msg: Any): Message = {
-    new Message(msg)
+  def apply(value: Any): Message = {
+    new DefaultMessage(value)
   }
 
   /**
-   * @param msg Accept any type except Null, Nothing and Unit
-   * @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue)
+   * @param value Accept any type except Null, Nothing and Unit
+   * @param timestamp timestamp must be smaller than Long.MaxValue
+   */
+  def apply(value: Any, timestamp: TimeStamp): Message = {
+    DefaultMessage(value, timestamp)
+  }
+
+  /**
+   * @param value Accept any type except Null, Nothing and Unit
+   * @param timestamp timestamp must be smaller than Instant.ofEpochMilli(Long.MaxValue)
    */
-  def apply(msg: Any, timestamp: Instant): Message = {
-    new Message(msg, timestamp)
+  def apply(value: Any, timestamp: Instant): Message = {
+    new DefaultMessage(value, timestamp)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
index ddd4d1a..dbc0efa 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -30,7 +30,7 @@ class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
   override def onStart(startTime: Instant): Unit = {}
 
   override def onNext(msg: Message): Unit = {
-    val list = msg.msg.asInstanceOf[Vector[String]]
-    output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
+    val list = msg.value.asInstanceOf[Vector[String]]
+    output(Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
index e9b00a0..8cc23ff 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -35,7 +35,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
   }
 
   override def onNext(msg: Message): Unit = {
-    val l = msg.msg.asInstanceOf[Vector[String]]
+    val l = msg.value.asInstanceOf[Vector[String]]
     list.size match {
       case 1 =>
         l.foreach(f => {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
index 074b389..e3bc29a 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -35,7 +35,7 @@ class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContex
   override def onNext(msg: Message): Unit = {
     val list = Vector(getClass.getCanonicalName)
     val now = Instant.now
-    output(new Message(list, now.toEpochMilli))
+    output(Message(list, now.toEpochMilli))
     self ! Watermark(now)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
index 241e0f6..f5376d5 100644
--- a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
@@ -36,6 +36,6 @@ class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAnd
     val list = Vector(classOf[Node].getCanonicalName)
     val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
     node.onNext(Message(list))
-    verify(context).output(argMatch[Message](_.msg == expected))
+    verify(context).output(argMatch[Message](_.value == expected))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
index 20cad1c..f445566 100644
--- a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
@@ -38,7 +38,8 @@ class SourceSpec extends WordSpec with Matchers {
       val source = new Source(context, UserConfig.empty)
       source.onNext(Message("start"))
 
-      verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg))
+      verify(context).output(argMatch[Message](
+        Vector(classOf[Source].getCanonicalName) == _.value))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 561346e..229f073 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -63,7 +63,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
   }
 
   override def onNext(msg: Message): Unit = {
-    val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
+    val kv = msg.value.asInstanceOf[String].split("\\+\\+")
     if (kv.length >= 2) {
       key.set(kv(0))
       value.set(kv(1))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index a03e68d..215cbfd 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -79,7 +79,7 @@ class SeqFileStreamProducerSpec
 
     val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
     verify(context).output(argMatch[Message](msg =>
-      expected.contains(msg.msg.asInstanceOf[String])))
+      expected.contains(msg.value.asInstanceOf[String])))
   }
 
   after {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
index b78e788..4250a43 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -32,8 +32,8 @@ class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext
   }
 
   override def onNext(msg: Message): Unit = {
-    Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]])
+    Injection.invert[String, Array[Byte]](msg.value.asInstanceOf[Array[Byte]])
       .foreach(_.split("\\s+").foreach(
-        word => output(new Message(word, msg.timestamp))))
+        word => output(Message(word, msg.timestamp))))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 58bb884..d1a98d0 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -33,10 +33,10 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
   override def onStart(startTime: Instant): Unit = {}
 
   override def onNext(message: Message): Unit = {
-    val word = message.msg.asInstanceOf[String]
+    val word = message.value.asInstanceOf[String]
     val count = wordcount.getOrElse(word, 0L) + 1
     wordcount += word -> count
-    output(new Message(
+    output(Message(
       Injection[String, Array[Byte]](word) ->
         Injection[Long, Array[Byte]](count),
       message.timestamp))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
index a16cf4c..9fb581d 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -20,10 +20,9 @@ package org.apache.gearpump.streaming.examples.sol
 
 import java.time.Instant
 import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
 
+import scala.concurrent.duration.FiniteDuration
 import akka.actor.Cancellable
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.{Task, TaskContext}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index 2b443e5..973a84e 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -61,7 +61,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
 
   override def onNext(msg: Message): Unit = {
     val message = messages(rand.nextInt(messages.length))
-    output(new Message(message, System.currentTimeMillis()))
+    output(Message(message, System.currentTimeMillis()))
     messageCount = messageCount + 1L
     self ! Watermark(Instant.now)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
index 4efc6e1..7f4bc22 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -38,7 +38,8 @@ class CountProcessor(taskContext: TaskContext, conf: UserConfig)
 
   override def processMessage(state: PersistentState[Int], message: Message): Unit = {
     state.update(message.timestamp.toEpochMilli, 1)
-    state.get.foreach(s => taskContext.output(Message(serializer.serialize(s), message.timestamp)))
+    state.get.foreach(s => taskContext.output(
+      Message(serializer.serialize(s), message.timestamp)))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
index 8ddbedd..0052c57 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
@@ -19,10 +19,8 @@
 package org.apache.gearpump.streaming.examples.state.processor
 
 import scala.collection.immutable.TreeMap
-
 import com.twitter.algebird.{AveragedGroup, AveragedValue}
 import org.slf4j.Logger
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.monoid.AlgebirdGroup
@@ -48,7 +46,7 @@ class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig)
 
   override def processMessage(state: PersistentState[AveragedValue],
       message: Message): Unit = {
-    val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
+    val value = AveragedValue(message.value.asInstanceOf[String].toLong)
     state.update(message.timestamp.toEpochMilli, value)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala
new file mode 100644
index 0000000..619e5d4
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.streaming.examples.state.MessageCountApp._
+
+class DefaultMessageCountAppSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  property("MessageCount should succeed to submit application with required arguments") {
+    val requiredArgs = Array(
+      s"-$SOURCE_TOPIC", "source",
+      s"-$SINK_TOPIC", "sink",
+      s"-$ZOOKEEPER_CONNECT", "localhost:2181",
+      s"-$BROKER_LIST", "localhost:9092",
+      s"-$DEFAULT_FS", "hdfs://localhost:9000"
+    )
+    val optionalArgs = Array(
+      s"-$SOURCE_TASK", "2",
+      s"-$COUNT_TASK", "2",
+      s"-$SINK_TASK", "2"
+    )
+
+    val args = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs.take(0)),
+        (requiredArgs, optionalArgs.take(2)),
+        (requiredArgs, optionalArgs.take(4)),
+        (requiredArgs, optionalArgs)
+      )
+    }
+
+    val masterReceiver = createMockMaster()
+    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+      val args = requiredArgs ++ optionalArgs
+      Future {
+        MessageCountApp.main(masterConfig, args)
+      }
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
deleted file mode 100644
index 729994e..0000000
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.state
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
-import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.streaming.examples.state.MessageCountApp._
-
-class MessageCountAppSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("MessageCount should succeed to submit application with required arguments") {
-    val requiredArgs = Array(
-      s"-$SOURCE_TOPIC", "source",
-      s"-$SINK_TOPIC", "sink",
-      s"-$ZOOKEEPER_CONNECT", "localhost:2181",
-      s"-$BROKER_LIST", "localhost:9092",
-      s"-$DEFAULT_FS", "hdfs://localhost:9000"
-    )
-    val optionalArgs = Array(
-      s"-$SOURCE_TASK", "2",
-      s"-$COUNT_TASK", "2",
-      s"-$SINK_TASK", "2"
-    )
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs.take(0)),
-        (requiredArgs, optionalArgs.take(2)),
-        (requiredArgs, optionalArgs.take(4)),
-        (requiredArgs, optionalArgs)
-      )
-    }
-
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-      Future {
-        MessageCountApp.main(masterConfig, args)
-      }
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index 5affb5e..158baeb 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -33,7 +33,6 @@ import org.scalatest.{Matchers, PropSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.state.api.PersistentTask
 import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
 import org.apache.gearpump.streaming.task.UpdateCheckpointClock
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
index a0996b3..22425e3 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -18,6 +18,7 @@
 
 package org.apache.gearpump.streaming.examples.wordcountjava;
 
+import org.apache.gearpump.DefaultMessage;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.javaapi.Task;
@@ -45,7 +46,7 @@ public class Split extends Task {
     // Split the TEXT to words
     String[] words = TEXT.split(" ");
     for (int i = 0; i < words.length; i++) {
-      context.output(new Message(words[i], Instant.now().toEpochMilli()));
+      context.output(DefaultMessage.apply(words[i], Instant.now().toEpochMilli()));
     }
     self().tell(new Watermark(Instant.now()), self());
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
index 3daa6e0..41cdbdc 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -42,8 +42,8 @@ public class Sum extends Task {
   }
 
   @Override
-  public void onNext(Message messagePayLoad) {
-    String word = (String) (messagePayLoad.msg());
+  public void onNext(Message message) {
+    String word = (String) (message.value());
     Integer current = wordCount.get(word);
     if (current == null) {
       current = 0;

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index d8262fd..2830b16 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -19,6 +19,7 @@
 package org.apache.gearpump.streaming.examples.wordcountjava.dsl;
 
 import com.typesafe.config.Config;
+import org.apache.gearpump.DefaultMessage;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
@@ -79,7 +80,7 @@ public class WordCount {
 
     @Override
     public Message read() {
-      return Message.apply(str, Instant.now());
+      return new DefaultMessage(str, Instant.now());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
index dbefc93..6f482fa 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
@@ -20,11 +20,10 @@ package org.apache.gearpump.streaming.examples.wordcount
 
 import java.time.Instant
 import java.util.concurrent.TimeUnit
+
 import scala.collection.mutable
 import scala.concurrent.duration.FiniteDuration
-
 import akka.actor.Cancellable
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -45,9 +44,9 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
 
   override def onNext(msg: Message): Unit = {
     if (null != msg) {
-      val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
+      val current = map.getOrElse(msg.value.asInstanceOf[String], 0L)
       wordCount += 1
-      map.put(msg.msg.asInstanceOf[String], current + 1)
+      map.put(msg.value.asInstanceOf[String], current + 1)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
index 5c2485b..3d412ff 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
@@ -18,15 +18,10 @@
 
 package org.apache.gearpump.akkastream.task
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-
 class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
   extends GraphTask(context, userConf) {
 
@@ -35,8 +30,8 @@ class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
   val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE)
   val seed = userConf.getValue[In => Out](BatchTask.SEED)
 
-  override def onNext(msg : Message) : Unit = {
-    val data = msg.msg.asInstanceOf[In]
+  override def onNext(msg: Message) : Unit = {
+    val data = msg.value.asInstanceOf[In]
     val time = msg.timestamp
     context.output(msg)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
index b77b9bd..94954c0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
@@ -28,7 +28,7 @@ class ConcatTask(context: TaskContext, userConf : UserConfig)
   val sizeOfOutputs = sizeOfOutPorts
   var index = 0
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     output(index, msg)
     index += 1
     if (index == sizeOfOutputs) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
index ae91d1f..602f732 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -25,7 +25,6 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 
 case object DelayInitialTime
@@ -42,8 +41,8 @@ class DelayInitialTask[T](context: TaskContext, userConf : UserConfig)
       self ! Message(DelayInitialTime, Instant.now())
     )
   }
-  override def onNext(msg : Message) : Unit = {
-    msg.msg match {
+  override def onNext(msg: Message) : Unit = {
+    msg.value match {
       case DelayInitialTime =>
         delayInitialActive = false
       case _ =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
index 4c19de5..c0756e3 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -42,8 +42,8 @@ class DropWithinTask[T](context: TaskContext, userConf : UserConfig)
     )
   }
 
-  override def onNext(msg : Message) : Unit = {
-    msg.msg match {
+  override def onNext(msg: Message) : Unit = {
+    msg.value match {
       case DropWithinTimeout =>
         timeoutActive = false
       case _ =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
index 14ff537..d3e815a 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
@@ -28,7 +28,7 @@ class FlattenMergeTask(context: TaskContext, userConf : UserConfig)
   val sizeOfOutputs = sizeOfOutPorts
   var index = 0
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     output(index, msg)
     index += 1
     if (index == sizeOfOutputs) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
index d982ebd..8de9f8d 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
@@ -38,13 +38,13 @@ class FoldTask[In, Out](context: TaskContext, userConf : UserConfig)
     })
   }
 
-  override def onNext(msg : Message) : Unit = {
-    val data = msg.msg.asInstanceOf[In]
+  override def onNext(msg: Message) : Unit = {
+    val data = msg.value.asInstanceOf[In]
     val time = msg.timestamp
     aggregator.foreach(func => {
       aggregated = func(aggregated, data)
       LOG.info(s"aggregated = $aggregated")
-      val msg = new Message(aggregated, time)
+      val msg = Message(aggregated, time)
       context.output(msg)
     })
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
index eaf2b3f..12e2d40 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
@@ -33,7 +33,7 @@ class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig)
   val timeWindow = userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW)
   val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE)
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
index 741ec43..908e21e 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
@@ -29,7 +29,7 @@ class InterleaveTask(context: TaskContext, userConf : UserConfig)
   var index = 0
 
   // TODO access upstream and pull
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     output(index, msg)
     index += 1
     if (index == sizeOfInputs) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
index daa1afc..c500ba2 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
@@ -30,15 +30,15 @@ class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig)
   val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC)
   implicit val ec = context.system.dispatcher
 
-  override def onNext(msg : Message) : Unit = {
-    val data = msg.msg.asInstanceOf[In]
+  override def onNext(msg: Message) : Unit = {
+    val data = msg.value.asInstanceOf[In]
     val time = msg.timestamp
     f match {
       case Some(func) =>
         val fout = func(data)
         fout.onComplete(value => {
           value.foreach(out => {
-            val msg = new Message(out, time)
+            val msg = Message(out, time)
             context.output(msg)
           })
         })

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
index ad18f72..1ecc4d0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
@@ -28,7 +28,7 @@ class MergeTask(context: TaskContext, userConf : UserConfig)
   val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE)
   val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS)
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     context.output(msg)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
index 5bea47e..bff4e76 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -27,7 +27,7 @@ class SingleSourceTask[T](context: TaskContext, userConf : UserConfig)
 
   val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     context.output(Message(elem, msg.timestamp))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
index 1b9c4e3..d92e24c 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
@@ -61,7 +61,7 @@ class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
 
   override def onStart(startTime : Instant) : Unit = {}
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     queue.add(msg)
     trySendingData()
   }
@@ -71,7 +71,7 @@ class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
   private def trySendingData(): Unit = {
     if (subscriber != null) {
       (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg =>
-        subscriber ! msg.msg
+        subscriber ! msg.value
         request -= 1
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
index 5b64a52..b7fd9c3 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -53,7 +53,7 @@ class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig)
 
   override def onStart(startTime : Instant) : Unit = {}
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     LOG.info("AkkaStreamSource receiving message " + msg)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
index b776f2c..d8beeb5 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -34,8 +34,8 @@ class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig
     f = func()
   }
 
-  override def onNext(msg : Message) : Unit = {
-    val in: IN = msg.msg.asInstanceOf[IN]
+  override def onNext(msg: Message) : Unit = {
+    val in: IN = msg.value.asInstanceOf[IN]
     val out: Iterable[OUT] = f(in)
     val iterator = out.iterator
     while(iterator.hasNext) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
index 7aa4e8e..689a6b3 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -42,8 +42,8 @@ class TakeWithinTask[T](context: TaskContext, userConf : UserConfig)
     )
   }
 
-  override def onNext(msg : Message) : Unit = {
-    msg.msg match {
+  override def onNext(msg: Message) : Unit = {
+    msg.value match {
       case DropWithinTimeout =>
         timeoutActive = true
       case _ =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
index 3c7ad87..ef1e35f 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
@@ -24,7 +24,6 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 
 class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
@@ -38,8 +37,8 @@ class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
   val interval = timePeriod.toNanos / cost
 
   // TODO control rate from TaskActor
-  override def onNext(msg : Message) : Unit = {
-    val data = msg.msg.asInstanceOf[T]
+  override def onNext(msg: Message) : Unit = {
+    val data = msg.value.asInstanceOf[T]
     val time = msg.timestamp
     context.output(msg)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
index a10e138..086fd48 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -19,7 +19,6 @@
 package org.apache.gearpump.akkastream.task
 
 import java.time.Instant
-import java.util.Date
 import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump.Message
@@ -33,7 +32,6 @@ class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
 
   val initialDelay = userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY).
     getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
-  (TickSourceTask.INITIAL_DELAY)
   val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL).
     getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
   val tick = userConf.getValue[T](TickSourceTask.TICK).get
@@ -44,7 +42,7 @@ class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
     )
   }
 
-  override def onNext(msg : Message) : Unit = {
+  override def onNext(msg: Message) : Unit = {
     context.output(msg)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
index 005d018..7f39ed7 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -29,10 +29,10 @@ class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
   val unzip = userConf.
     getValue[UnZipFunction[In, A1, A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
 
-  override def onNext(msg : Message) : Unit = {
-    val message = msg.msg
+  override def onNext(msg: Message) : Unit = {
+    val value = msg.value
     val time = msg.timestamp
-    val pair = unzip(message.asInstanceOf[In])
+    val pair = unzip(value.asInstanceOf[In])
     val (a, b) = pair
     output(0, Message(a.asInstanceOf[AnyRef], time))
     output(1, Message(b.asInstanceOf[AnyRef], time))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
index 7e0c082..ab0116c 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
@@ -31,8 +31,8 @@ class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig)
   var a1: Option[A1] = None
   var a2: Option[A2] = None
 
-  override def onNext(msg : Message) : Unit = {
-    val message = msg.msg
+  override def onNext(msg: Message) : Unit = {
+    val message = msg.value
     val time = msg.timestamp
     a1 match {
       case Some(x) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
----------------------------------------------------------------------
diff --git a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
index 492fffe..41c6499 100644
--- a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
+++ b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
@@ -49,7 +49,7 @@ class RMQSink(userConfig: UserConfig,
   }
 
   override def write(message: Message): Unit = {
-    publish(message.msg)
+    publish(message.value)
   }
 
   override def close(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
index 36a9fe3..9afb1fe 100644
--- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -59,7 +59,7 @@ class RedisSink(
   }
 
   override def write(message: Message): Unit = {
-    message.msg match {
+    message.value match {
       // GEO
       case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
index 4969314..86fc0ec 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -42,7 +42,7 @@ private[storm] class StormPartitioner(target: String) extends MulticastPartition
 
   override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int)
     : Array[Int] = {
-    val stormTuple = msg.msg.asInstanceOf[GearpumpTuple]
+    val stormTuple = msg.value.asInstanceOf[GearpumpTuple]
     stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
index e3b45fb..2f1b77b 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.experiments.storm.processor
 
 import java.time.Instant
 import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.Duration
 
+import scala.concurrent.duration.Duration
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index 4536277..248ca44 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._
 import org.apache.gearpump.experiments.storm.util.StormUtil._
 import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
 import org.apache.gearpump.streaming.DAG
-import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext}
+import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId}
 import org.apache.gearpump.util.{Constants, LogUtil}
 import org.apache.gearpump.{Message, TimeStamp}
 import org.slf4j.Logger
@@ -219,7 +219,7 @@ object GearpumpStormComponent {
     override def next(message: Message): Unit = {
       val timestamp = message.timestamp.toEpochMilli
       collector.setTimestamp(timestamp)
-      bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
+      bolt.execute(message.value.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
         timestamp))
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
index 5fc631b..aabb7c1 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
@@ -52,7 +52,8 @@ class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
         targetPartitions.foreach {
           case (target, ps) => {
             val partitioner = new StormPartitioner(target)
-            ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, currentPartitionId)
+            ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1,
+              currentPartitionId)
           }
         }
         val partitionNum = id

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index 6b894da..05627c9 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -18,8 +18,8 @@
 package org.apache.gearpump.experiments.storm.util
 
 import java.util.{List => JList, Map => JMap}
-import scala.collection.JavaConverters._
 
+import scala.collection.JavaConverters._
 import backtype.storm.generated.Grouping
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -27,8 +27,7 @@ import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump.{Message, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.streaming.MockUtil
 
@@ -63,9 +62,9 @@ class StormOutputCollectorSpec
         stormOutputCollector.setTimestamp(timestamp)
         stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
         verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
-          case Message(tuple: GearpumpTuple, t) =>
+          message: Message =>
             val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
-            tuple == expected && t == timestamp
+            message.value == expected && message.timestamp.toEpochMilli == timestamp
         }))
     }
   }
@@ -96,11 +95,11 @@ class StormOutputCollectorSpec
         stormOutputCollector.emitDirect(id, streamId, values)
         val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
         verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
-          case Message(tuple: GearpumpTuple, t) => {
+          message: Message => {
             val expected = new GearpumpTuple(values, stormTaskId, streamId,
             Map(target -> partitions))
 
-            val result = tuple == expected && t == timestamp
+            val result = message.value == expected && message.timestamp.toEpochMilli == timestamp
             result
           }
         }))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
index bb56003..7d0838e 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
@@ -22,7 +22,6 @@ import java.text.SimpleDateFormat
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hdfs.HdfsConfiguration
 import org.apache.hadoop.io.SequenceFile
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
index 822eb5f..04e4781 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
@@ -18,13 +18,12 @@
 package org.apache.gearpump.streaming.hadoop.lib.format
 
 import org.apache.hadoop.io.{LongWritable, Text, Writable}
-
 import org.apache.gearpump.Message
 
 class DefaultSequenceFormatter extends OutputFormatter {
   override def getKey(message: Message): Writable = new LongWritable(message.timestamp.toEpochMilli)
 
-  override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String])
+  override def getValue(message: Message): Writable = new Text(message.value.asInstanceOf[String])
 
   override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable]
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
index 435d0fc..2e5874c 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
@@ -18,7 +18,6 @@
 package org.apache.gearpump.streaming.hadoop.lib.format
 
 import org.apache.hadoop.io.Writable
-
 import org.apache.gearpump.Message
 
 trait OutputFormatter extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
index 4b41ba1..f5e6483 100644
--- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
+++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -84,7 +84,7 @@ class HBaseSink(userConfig: UserConfig, tableName: String,
   }
 
   override def write(message: Message): Unit = {
-    put(message.msg)
+    put(message.value)
   }
 
   def close(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
index e5534a6..76b4c0b 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
@@ -68,7 +68,7 @@ abstract class AbstractKafkaSink private[kafka](
   }
 
   override def write(message: Message): Unit = {
-    message.msg match {
+    message.value match {
       case (k: Array[Byte], v: Array[Byte]) =>
         val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, k, v)
         producer.send(record)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index a8bda50..d5a8729 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -28,7 +28,7 @@ import org.apache.gearpump.streaming.kafka.lib.KafkaMessageDecoder
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
 import KafkaClient.KafkaClientFactory
-import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, FetchThread}
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.{FetchThread, KafkaMessage}
 import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
 import org.apache.gearpump.streaming.kafka.util.KafkaConfig
@@ -97,6 +97,7 @@ abstract class AbstractKafkaSource(
   /**
    * Reads a record from incoming queue, decodes, filters and checkpoints offsets
    * before returns a Message. Message can be null if the incoming queue is empty.
+ *
    * @return a [[org.apache.gearpump.Message]] or null
    */
   override def read(): Message = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
index 9f29022..2b52d76 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.streaming.kafka.lib.source
 
-import java.time.Instant
-
 import com.twitter.bijection.Injection
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
@@ -34,7 +32,7 @@ class DefaultKafkaMessageDecoderSpec extends PropSpec with PropertyChecks with M
       val msgAndWmk = decoder.fromBytes(kbytes, vbytes)
       val message = msgAndWmk.message
       val watermark = msgAndWmk.watermark
-      message.msg shouldBe vbytes
+      message.value shouldBe vbytes
       // processing time as message timestamp and watermark
       message.timestamp shouldBe watermark
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
index 7e1214e..3789d4e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
 class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner {
 
   override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    val hashCode = fn(message.msg.asInstanceOf[T]).hashCode()
+    val hashCode = fn(message.value.asInstanceOf[T]).hashCode()
     (hashCode & Integer.MAX_VALUE) % partitionNum
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 9a614e8..9c5e347 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -291,7 +291,7 @@ class LoggerSink[T] extends DataSink {
   }
 
   override def write(message: Message): Unit = {
-    logger.info("logging message " + message.msg)
+    logger.info("logging message " + message.value)
   }
 
   override def close(): Unit = Unit

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 86ac933..9571697 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -42,18 +42,19 @@ object TransformTask {
       val processor = operator.map(FunctionRunner.withEmitFn(_,
         (out: OUT) => taskContext.output(Message(out, watermarkTime))))
       processor.foreach(_.setup())
-      buffer.foreach { case message@Message(in, time) =>
-        if (time < watermarkTime) {
-          processor match {
-            case Some(p) =>
-              // .toList forces eager evaluation
-              p.process(in.asInstanceOf[IN]).toList
-            case None =>
-              taskContext.output(Message(in, watermarkTime))
+      buffer.foreach {
+        message: Message =>
+          if (message.timestamp.toEpochMilli < watermarkTime) {
+            processor match {
+              case Some(p) =>
+                // .toList forces eager evaluation
+                p.process(message.value.asInstanceOf[IN]).toList
+              case None =>
+                taskContext.output(Message(message.value, watermarkTime))
+            }
+          } else {
+            nextBuffer +:= message
           }
-        } else {
-          nextBuffer +:= message
-        }
       }
       // .toList forces eager evaluation
       processor.map(_.finish().toList)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 5f9d19b..05ce74e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -68,7 +68,7 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo
 case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) {
 
   def groupBy(message: Message): (GROUP, List[Window]) = {
-    val ele = message.msg.asInstanceOf[T]
+    val ele = message.value.asInstanceOf[T]
     val group = groupByFn(ele)
     val windows = window.windowFn(new WindowFunction.Context[T] {
       override def element: T = ele

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 42d50e2..74749b9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -58,7 +58,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
   private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
 
   override def process(message: Message): Unit = {
-    val input = message.msg.asInstanceOf[IN]
+    val input = message.value.asInstanceOf[IN]
     val (group, windows) = groupBy.groupBy(message)
     if (!groupedWindowInputs.containsKey(group)) {
       groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
index 9b63e04..36c331a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
@@ -25,8 +25,8 @@ class BroadcastPartitioner extends MulticastPartitioner {
   private var lastPartitionNum = -1
   private var partitions = Array.empty[Int]
 
-  override def getPartitions(
-      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+  override def getPartitions(msg: Message, partitionNum: Int,
+      currentPartitionId: Int): Array[Int] = {
     if (partitionNum != lastPartitionNum) {
       partitions = (0 until partitionNum).toArray
       lastPartitionNum = partitionNum

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
index 6137705..dc48741 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
@@ -27,6 +27,6 @@ import org.apache.gearpump.Message
  */
 class HashPartitioner extends UnicastPartitioner {
   override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
+    (msg.value.hashCode() & Integer.MAX_VALUE) % partitionNum
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 4371257..0ec2b6f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -19,8 +19,7 @@ package org.apache.gearpump.streaming.source
 
 import java.time.Instant
 
-import org.apache.gearpump.Message
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS}
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message}
 
 /**
  * message used by source task to report source watermark.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index f9f5b33..d3ffaa9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -22,7 +22,7 @@ import java.time.Instant
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
@@ -79,7 +79,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
   }
 
   final override def onNext(message: Message): Unit = {
-    checkpointManager.update(message.timeInMillis)
+    checkpointManager.update(message.timestamp.toEpochMilli)
       .foreach(state.setNextCheckpointTime)
     processMessage(state, message)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
index 3cc1cd0..6947dd4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
@@ -19,7 +19,6 @@
 package org.apache.gearpump.streaming.task
 
 import akka.actor.{ActorRef, ExtendedActorSystem}
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.transport.netty.TaskMessage
 import org.apache.gearpump.transport.{Express, HostPort}
@@ -55,8 +54,8 @@ trait ExpressTransport {
         if (null == serializedMessage) {
           msg match {
             case message: Message =>
-              val bytes = serializerPool.get().serialize(message.msg)
-              serializedMessage = SerializedMessage(message.timeInMillis, bytes)
+              val bytes = serializerPool.get().serialize(message.value)
+              serializedMessage = SerializedMessage(message.timestamp.toEpochMilli, bytes)
             case _ => serializedMessage = msg
           }
         }