You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:41 UTC

[31/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
new file mode 100644
index 0000000..796b0d2
--- /dev/null
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.sol
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.Cancellable
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+  import taskContext.output
+
+  val taskConf = taskContext
+
+  private var msgCount: Long = 0
+  private var scheduler: Cancellable = null
+  private var snapShotWordCount: Long = 0
+  private var snapShotTime: Long = 0
+
+  override def onStart(startTime: StartTime): Unit = {
+    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+      new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
+    snapShotTime = System.currentTimeMillis()
+  }
+
+  override def onNext(msg: Message): Unit = {
+    output(msg)
+    msgCount = msgCount + 1
+  }
+
+  override def onStop(): Unit = {
+    if (scheduler != null) {
+      scheduler.cancel()
+    }
+  }
+
+  def reportWordCount(): Unit = {
+    val current: Long = System.currentTimeMillis()
+    LOG.info(s"Task ${taskConf.taskId} " +
+      s"Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) / 1000)} " +
+      s"(words, second)")
+    snapShotWordCount = msgCount
+    snapShotTime = current
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
new file mode 100644
index 0000000..84ed038
--- /dev/null
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.sol
+
+import java.util.Random
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext.output
+
+  private val sizeInBytes = conf.getInt(SOLStreamProducer.BYTES_PER_MESSAGE)
+    .getOrElse(DEFAULT_MESSAGE_SIZE)
+  private var messages: Array[String] = null
+  private var rand: Random = null
+  private var messageCount: Long = 0
+
+  override def onStart(startTime: StartTime): Unit = {
+    prepareRandomMessage
+    self ! Start
+  }
+
+  private def prepareRandomMessage = {
+    rand = new Random()
+    val differentMessages = 100
+    messages = new Array(differentMessages)
+
+    0.until(differentMessages).map { index =>
+      val sb = new StringBuilder(sizeInBytes)
+      // Even though java encodes strings in UCS2, the serialized version sent by the tuples
+      // is UTF8, so it should be a single byte
+      0.until(sizeInBytes).foldLeft(sb) { (sb, j) =>
+        sb.append(rand.nextInt(9))
+      }
+      messages(index) = sb.toString()
+    }
+  }
+
+  override def onNext(msg: Message): Unit = {
+    val message = messages(rand.nextInt(messages.length))
+    output(new Message(message, System.currentTimeMillis()))
+    messageCount = messageCount + 1L
+    self ! messageSourceMinClock
+  }
+
+  // messageSourceMinClock represent the min clock of the message source
+  private def messageSourceMinClock: Message = {
+    Message("tick", System.currentTimeMillis())
+  }
+}
+
+object SOLStreamProducer {
+  val DEFAULT_MESSAGE_SIZE = 100
+  // Bytes
+  val BYTES_PER_MESSAGE = "bytesPerMessage"
+  val Start = Message("start")
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
deleted file mode 100644
index 6e266d0..0000000
--- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.sol
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class SOLSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-  override def beforeAll {
-    startActorSystem()
-  }
-
-  override def afterAll {
-    shutdownActorSystem()
-  }
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  property("SOL should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-    val optionalArgs = Array(
-      "-streamProducer", "1",
-      "-streamProcessor", "1",
-      "-bytesPerMessage", "100",
-      "-stages", "10")
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        SOL.main(masterConfig, args)
-      }
-
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
deleted file mode 100644
index f5035b4..0000000
--- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.sol
-
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class SOLStreamProcessorSpec extends FlatSpec with Matchers {
-
-  it should "pass the message downstream" in {
-    val stringGenerator = Gen.alphaStr
-    val context = MockUtil.mockTaskContext
-
-    val sol = new SOLStreamProcessor(context, UserConfig.empty)
-    sol.onStart(StartTime(0))
-    val msg = Message("msg")
-    sol.onNext(msg)
-    verify(context, times(1)).output(msg)
-    sol.onStop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
deleted file mode 100644
index 4bac30c..0000000
--- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.sol
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class SOLStreamProducerSpec extends WordSpec with Matchers {
-
-  "SOLStreamProducer" should {
-    "producer message continuously" in {
-
-      val conf = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, 100)
-      val context = MockUtil.mockTaskContext
-
-      val producer = new SOLStreamProducer(context, conf)
-      producer.onStart(StartTime(0))
-      producer.onNext(Message("msg"))
-      verify(context).output(any[Message])
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala
new file mode 100644
index 0000000..29e8284
--- /dev/null
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.sol
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class SOLSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
+  override def beforeAll {
+    startActorSystem()
+  }
+
+  override def afterAll {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  property("SOL should succeed to submit application with required arguments") {
+    val requiredArgs = Array.empty[String]
+    val optionalArgs = Array(
+      "-streamProducer", "1",
+      "-streamProcessor", "1",
+      "-bytesPerMessage", "100",
+      "-stages", "10")
+
+    val args = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs)
+      )
+    }
+    val masterReceiver = createMockMaster()
+    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+      val args = requiredArgs ++ optionalArgs
+
+      Future {
+        SOL.main(masterConfig, args)
+      }
+
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
new file mode 100644
index 0000000..a6cc966
--- /dev/null
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.sol
+
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class SOLStreamProcessorSpec extends FlatSpec with Matchers {
+
+  it should "pass the message downstream" in {
+    val stringGenerator = Gen.alphaStr
+    val context = MockUtil.mockTaskContext
+
+    val sol = new SOLStreamProcessor(context, UserConfig.empty)
+    sol.onStart(StartTime(0))
+    val msg = Message("msg")
+    sol.onNext(msg)
+    verify(context, times(1)).output(msg)
+    sol.onStop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
new file mode 100644
index 0000000..2316de8
--- /dev/null
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.sol
+
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class SOLStreamProducerSpec extends WordSpec with Matchers {
+
+  "SOLStreamProducer" should {
+    "producer message continuously" in {
+
+      val conf = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, 100)
+      val context = MockUtil.mockTaskContext
+
+      val producer = new SOLStreamProducer(context, conf)
+      producer.onStart(StartTime(0))
+      producer.onNext(Message("msg"))
+      verify(context).output(any[Message])
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/resources/state.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/resources/state.conf b/examples/streaming/state/src/main/resources/state.conf
index 7e26d4b..a7eec2b 100644
--- a/examples/streaming/state/src/main/resources/state.conf
+++ b/examples/streaming/state/src/main/resources/state.conf
@@ -1,6 +1,6 @@
 state {
   checkpoint {
     interval = 1000 # milliseconds
-    store.factory = io.gearpump.streaming.kafka.KafkaCheckpointStoreFactory
+    store.factory = org.apache.gearpump.streaming.kafka.KafkaCheckpointStoreFactory
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
deleted file mode 100644
index 3c6cde6..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import akka.actor.ActorSystem
-import org.apache.hadoop.conf.Configuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.examples.state.processor.CountProcessor
-import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
-import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
-import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
-import io.gearpump.streaming.state.impl.PersistentStateConfig
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph}
-
-/** Does exactly-once message count */
-object MessageCountApp extends AkkaApp with ArgumentsParser {
-  val SOURCE_TASK = "sourceTask"
-  val COUNT_TASK = "countTask"
-  val SINK_TASK = "sinkTask"
-  val SOURCE_TOPIC = "sourceTopic"
-  val SINK_TOPIC = "sinkTopic"
-  val ZOOKEEPER_CONNECT = "zookeeperConnect"
-  val BROKER_LIST = "brokerList"
-  val DEFAULT_FS = "defaultFS"
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false,
-    defaultValue = Some(1)),
-    COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)),
-    SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
-    defaultValue = Some(1)),
-    SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
-    SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
-    ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
-    required = true),
-    BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
-    DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>",
-      required = true)
-  )
-
-  def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
-    val hadoopConfig = new Configuration
-    hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS))
-    val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig,
-      // Rotates on 1KB
-      new FileSizeRotation(1000))
-    val taskConfig = UserConfig.empty
-      .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
-      .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
-      .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
-
-    val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT)
-    val brokerList = config.getString(BROKER_LIST)
-    val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList)
-    val sourceTopic = config.getString(SOURCE_TOPIC)
-    val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory)
-    val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK))
-    val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig)
-    val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList)
-    val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK))
-    val partitioner = new HashPartitioner()
-    val graph = Graph(sourceProcessor ~ partitioner
-      ~> countProcessor ~ partitioner ~> sinkProcessor)
-    val app = StreamApplication("MessageCount", graph, UserConfig.empty)
-    app
-  }
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-    implicit val system = context.system
-    val appId = context.submit(application(config))
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
deleted file mode 100644
index 6f3bc79..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import akka.actor.ActorSystem
-import org.apache.hadoop.conf.Configuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor}
-import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
-import io.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig}
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph}
-
-/** Does exactly-once sliding window based average aggregation */
-object WindowAverageApp extends AkkaApp with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)),
-    "window" -> CLIOption("<how mange window tasks", required = false, defaultValue = Some(1)),
-    "window_size" -> CLIOption("<window size in milliseconds>", required = false,
-      defaultValue = Some(5000)),
-    "window_step" -> CLIOption("<window step in milliseconds>", required = false,
-      defaultValue = Some(5000))
-  )
-
-  def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
-    val windowSize = config.getInt("window_size")
-    val windowStep = config.getInt("window_step")
-    val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", new Configuration)
-    val taskConfig = UserConfig.empty.
-      withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
-      .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
-      .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
-      .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
-    val gen = Processor[NumberGeneratorProcessor](config.getInt("gen"))
-    val count = Processor[WindowAverageProcessor](config.getInt("window"), taskConf = taskConfig)
-    val partitioner = new HashPartitioner()
-    val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count),
-      UserConfig.empty)
-    app
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-    val context = ClientContext(akkaConf)
-
-    implicit val system = context.system
-    val appId = context.submit(application(config))
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
deleted file mode 100644
index 6610b91..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.monoid.AlgebirdMonoid
-import io.gearpump.streaming.serializer.ChillSerializer
-import io.gearpump.streaming.state.api.{PersistentState, PersistentTask}
-import io.gearpump.streaming.state.impl.NonWindowState
-import io.gearpump.streaming.task.TaskContext
-
-class CountProcessor(taskContext: TaskContext, conf: UserConfig)
-  extends PersistentTask[Int](taskContext, conf) {
-
-  override def persistentState: PersistentState[Int] = {
-    import com.twitter.algebird.Monoid.intMonoid
-    new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int])
-  }
-
-  override def processMessage(state: PersistentState[Int], message: Message): Unit = {
-    state.update(message.timestamp, 1)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
deleted file mode 100644
index fa9854f..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-  import taskContext.output
-
-  private var num = 0L
-  override def onStart(startTime: StartTime): Unit = {
-    num = startTime.startTime
-    self ! Message("start")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    output(Message(num + "", num))
-    num += 1
-
-    import scala.concurrent.duration._
-    taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
deleted file mode 100644
index e7ac9b3..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.collection.immutable.TreeMap
-
-import com.twitter.algebird.{AveragedGroup, AveragedValue}
-import org.slf4j.Logger
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.monoid.AlgebirdGroup
-import io.gearpump.streaming.serializer.ChillSerializer
-import io.gearpump.streaming.state.api.{PersistentState, PersistentTask}
-import io.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, WindowState}
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.util.LogUtil
-
-object WindowAverageProcessor {
-  val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor])
-}
-
-class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig)
-  extends PersistentTask[AveragedValue](taskContext, conf) {
-
-  override def persistentState: PersistentState[AveragedValue] = {
-    val group = new AlgebirdGroup(AveragedGroup)
-    val serializer = new ChillSerializer[TreeMap[Interval, AveragedValue]]
-    val window = new Window(conf.getValue[WindowConfig](WindowConfig.NAME).get)
-    new WindowState[AveragedValue](group, serializer, taskContext, window)
-  }
-
-  override def processMessage(state: PersistentState[AveragedValue],
-      message: Message): Unit = {
-    val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
-    state.update(message.timestamp, value)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
new file mode 100644
index 0000000..13bef0d
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import akka.actor.ActorSystem
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.examples.state.processor.CountProcessor
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory}
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** Does exactly-once message count */
+object MessageCountApp extends AkkaApp with ArgumentsParser {
+  val SOURCE_TASK = "sourceTask"
+  val COUNT_TASK = "countTask"
+  val SINK_TASK = "sinkTask"
+  val SOURCE_TOPIC = "sourceTopic"
+  val SINK_TOPIC = "sinkTopic"
+  val ZOOKEEPER_CONNECT = "zookeeperConnect"
+  val BROKER_LIST = "brokerList"
+  val DEFAULT_FS = "defaultFS"
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false,
+    defaultValue = Some(1)),
+    COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)),
+    SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
+    defaultValue = Some(1)),
+    SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
+    SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
+    ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
+    required = true),
+    BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
+    DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>",
+      required = true)
+  )
+
+  def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
+    val hadoopConfig = new Configuration
+    hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS))
+    val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig,
+      // Rotates on 1KB
+      new FileSizeRotation(1000))
+    val taskConfig = UserConfig.empty
+      .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+      .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+      .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
+
+    val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT)
+    val brokerList = config.getString(BROKER_LIST)
+    val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList)
+    val sourceTopic = config.getString(SOURCE_TOPIC)
+    val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory)
+    val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK))
+    val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig)
+    val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList)
+    val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK))
+    val partitioner = new HashPartitioner()
+    val graph = Graph(sourceProcessor ~ partitioner
+      ~> countProcessor ~ partitioner ~> sinkProcessor)
+    val app = StreamApplication("MessageCount", graph, UserConfig.empty)
+    app
+  }
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    implicit val system = context.system
+    val appId = context.submit(application(config))
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
new file mode 100644
index 0000000..629deb7
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import akka.actor.ActorSystem
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig}
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** Does exactly-once sliding window based average aggregation */
+object WindowAverageApp extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)),
+    "window" -> CLIOption("<how mange window tasks", required = false, defaultValue = Some(1)),
+    "window_size" -> CLIOption("<window size in milliseconds>", required = false,
+      defaultValue = Some(5000)),
+    "window_step" -> CLIOption("<window step in milliseconds>", required = false,
+      defaultValue = Some(5000))
+  )
+
+  def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
+    val windowSize = config.getInt("window_size")
+    val windowStep = config.getInt("window_step")
+    val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", new Configuration)
+    val taskConfig = UserConfig.empty.
+      withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+      .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+      .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
+      .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
+    val gen = Processor[NumberGeneratorProcessor](config.getInt("gen"))
+    val count = Processor[WindowAverageProcessor](config.getInt("window"), taskConf = taskConfig)
+    val partitioner = new HashPartitioner()
+    val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count),
+      UserConfig.empty)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+
+    implicit val system = context.system
+    val appId = context.submit(application(config))
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
new file mode 100644
index 0000000..9650a0a
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.monoid.AlgebirdMonoid
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+import org.apache.gearpump.streaming.state.api.{PersistentState, PersistentTask}
+import org.apache.gearpump.streaming.state.impl.NonWindowState
+import org.apache.gearpump.streaming.task.TaskContext
+
+class CountProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends PersistentTask[Int](taskContext, conf) {
+
+  override def persistentState: PersistentState[Int] = {
+    import com.twitter.algebird.Monoid.intMonoid
+    new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int])
+  }
+
+  override def processMessage(state: PersistentState[Int], message: Message): Unit = {
+    state.update(message.timestamp, 1)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
new file mode 100644
index 0000000..0e85f32
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+  import taskContext.output
+
+  private var num = 0L
+  override def onStart(startTime: StartTime): Unit = {
+    num = startTime.startTime
+    self ! Message("start")
+  }
+
+  override def onNext(msg: Message): Unit = {
+    output(Message(num + "", num))
+    num += 1
+
+    import scala.concurrent.duration._
+    taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
new file mode 100644
index 0000000..eea2504
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import scala.collection.immutable.TreeMap
+
+import com.twitter.algebird.{AveragedGroup, AveragedValue}
+import org.slf4j.Logger
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.monoid.AlgebirdGroup
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+import org.apache.gearpump.streaming.state.api.{PersistentState, PersistentTask}
+import org.apache.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, WindowState}
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+
+object WindowAverageProcessor {
+  val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor])
+}
+
+class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends PersistentTask[AveragedValue](taskContext, conf) {
+
+  override def persistentState: PersistentState[AveragedValue] = {
+    val group = new AlgebirdGroup(AveragedGroup)
+    val serializer = new ChillSerializer[TreeMap[Interval, AveragedValue]]
+    val window = new Window(conf.getValue[WindowConfig](WindowConfig.NAME).get)
+    new WindowState[AveragedValue](group, serializer, taskContext, window)
+  }
+
+  override def processMessage(state: PersistentState[AveragedValue],
+      message: Message): Unit = {
+    val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
+    state.update(message.timestamp, value)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
deleted file mode 100644
index 040c343..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.streaming.examples.state.MessageCountApp._
-
-class MessageCountAppSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  property("MessageCount should succeed to submit application with required arguments") {
-    val requiredArgs = Array(
-      s"-$SOURCE_TOPIC", "source",
-      s"-$SINK_TOPIC", "sink",
-      s"-$ZOOKEEPER_CONNECT", "localhost:2181",
-      s"-$BROKER_LIST", "localhost:9092",
-      s"-$DEFAULT_FS", "hdfs://localhost:9000"
-    )
-    val optionalArgs = Array(
-      s"-$SOURCE_TASK", "2",
-      s"-$COUNT_TASK", "2",
-      s"-$SINK_TASK", "2"
-    )
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs.take(0)),
-        (requiredArgs, optionalArgs.take(2)),
-        (requiredArgs, optionalArgs.take(4)),
-        (requiredArgs, optionalArgs)
-      )
-    }
-
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-      Future {
-        MessageCountApp.main(masterConfig, args)
-      }
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
deleted file mode 100644
index 7c1c798..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class WindowAverageAppSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  property("WindowAverage should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-    val optionalArgs = Array(
-      "-gen", "2",
-      "-window", "2",
-      "-window_size", "5000",
-      "-window_step", "5000"
-    )
-
-    val args = {
-      Table(
-        ("requiredArgs", "optionalArgs"),
-        (requiredArgs, optionalArgs.take(0)),
-        (requiredArgs, optionalArgs.take(2)),
-        (requiredArgs, optionalArgs.take(4)),
-        (requiredArgs, optionalArgs.take(6)),
-        (requiredArgs, optionalArgs)
-      )
-    }
-    val masterReceiver = createMockMaster()
-    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
-      val args = requiredArgs ++ optionalArgs
-
-      Future {
-        WindowAverageApp.main(masterConfig, args)
-      }
-
-      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-      masterReceiver.reply(SubmitApplicationResult(Success(0)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
deleted file mode 100644
index a69116b..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.PersistentTask
-import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
-import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
-import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
-
-class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("CountProcessor should update state") {
-
-    val taskContext = MockUtil.mockTaskContext
-
-    implicit val system = ActorSystem("test")
-
-    val longGen = Gen.chooseNum[Long](1, 1000)
-    forAll(longGen) {
-      (num: Long) =>
-
-        val conf = UserConfig.empty
-          .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
-          .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
-          .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
-            new InMemoryCheckpointStoreFactory)
-
-        val count = new CountProcessor(taskContext, conf)
-
-        val appMaster = TestProbe()(system)
-        when(taskContext.appMaster).thenReturn(appMaster.ref)
-
-        count.onStart(StartTime(0L))
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
-
-        for (i <- 0L to num) {
-          count.onNext(Message("", i))
-          count.state.get shouldBe Some(i + 1)
-        }
-        // Next checkpoint time is not arrived yet
-        when(taskContext.upstreamMinClock).thenReturn(0L)
-        count.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectNoMsg(10.milliseconds)
-
-        // Time to checkpoint
-        when(taskContext.upstreamMinClock).thenReturn(num)
-        count.onNext(PersistentTask.CHECKPOINT)
-        // Only the state before checkpoint time is checkpointed
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
-    }
-
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
deleted file mode 100644
index 18a49ac..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import org.mockito.Mockito._
-import org.mockito.{Matchers => MockitoMatchers}
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
-  "NumberGeneratorProcessor" should {
-    "send random numbers" in {
-
-      val taskContext = MockUtil.mockTaskContext
-
-      implicit val system = ActorSystem("test")
-
-      val mockTaskActor = TestProbe()
-
-      // Mock self ActorRef
-      when(taskContext.self).thenReturn(mockTaskActor.ref)
-
-      val conf = UserConfig.empty
-      val genNum = new NumberGeneratorProcessor(taskContext, conf)
-      genNum.onStart(StartTime(0))
-      mockTaskActor.expectMsgType[Message]
-
-      genNum.onNext(Message("next"))
-      verify(taskContext).output(MockitoMatchers.any[Message])
-      // mockTaskActor.expectMsgType[Message]
-
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
deleted file mode 100644
index a9c52aa..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import com.twitter.algebird.AveragedValue
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.PersistentTask
-import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
-import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
-import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
-
-class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers {
-  property("WindowAverageProcessor should update state") {
-
-    implicit val system = ActorSystem("test")
-    val longGen = Gen.chooseNum[Long](1, 1000)
-    forAll(longGen, longGen) {
-      (data: Long, num: Long) =>
-        val taskContext = MockUtil.mockTaskContext
-
-        val windowSize = num
-        val windowStep = num
-
-        val conf = UserConfig.empty
-          .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
-          .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
-          .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
-            new InMemoryCheckpointStoreFactory)
-          .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
-
-        val windowAverage = new WindowAverageProcessor(taskContext, conf)
-
-        val appMaster = TestProbe()(system)
-        when(taskContext.appMaster).thenReturn(appMaster.ref)
-
-        windowAverage.onStart(StartTime(0L))
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
-
-        for (i <- 0L until num) {
-          windowAverage.onNext(Message("" + data, i))
-          windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data))
-        }
-
-        // Next checkpoint time is not arrived yet
-        when(taskContext.upstreamMinClock).thenReturn(0L)
-        windowAverage.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectNoMsg(10.milliseconds)
-
-        // Time to checkpoint
-        when(taskContext.upstreamMinClock).thenReturn(num)
-        windowAverage.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
-    }
-
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
new file mode 100644
index 0000000..729994e
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.streaming.examples.state.MessageCountApp._
+
+class MessageCountAppSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  property("MessageCount should succeed to submit application with required arguments") {
+    val requiredArgs = Array(
+      s"-$SOURCE_TOPIC", "source",
+      s"-$SINK_TOPIC", "sink",
+      s"-$ZOOKEEPER_CONNECT", "localhost:2181",
+      s"-$BROKER_LIST", "localhost:9092",
+      s"-$DEFAULT_FS", "hdfs://localhost:9000"
+    )
+    val optionalArgs = Array(
+      s"-$SOURCE_TASK", "2",
+      s"-$COUNT_TASK", "2",
+      s"-$SINK_TASK", "2"
+    )
+
+    val args = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs.take(0)),
+        (requiredArgs, optionalArgs.take(2)),
+        (requiredArgs, optionalArgs.take(4)),
+        (requiredArgs, optionalArgs)
+      )
+    }
+
+    val masterReceiver = createMockMaster()
+    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+      val args = requiredArgs ++ optionalArgs
+      Future {
+        MessageCountApp.main(masterConfig, args)
+      }
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
new file mode 100644
index 0000000..00cb290
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class WindowAverageAppSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  property("WindowAverage should succeed to submit application with required arguments") {
+    val requiredArgs = Array.empty[String]
+    val optionalArgs = Array(
+      "-gen", "2",
+      "-window", "2",
+      "-window_size", "5000",
+      "-window_step", "5000"
+    )
+
+    val args = {
+      Table(
+        ("requiredArgs", "optionalArgs"),
+        (requiredArgs, optionalArgs.take(0)),
+        (requiredArgs, optionalArgs.take(2)),
+        (requiredArgs, optionalArgs.take(4)),
+        (requiredArgs, optionalArgs.take(6)),
+        (requiredArgs, optionalArgs)
+      )
+    }
+    val masterReceiver = createMockMaster()
+    forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+      val args = requiredArgs ++ optionalArgs
+
+      Future {
+        WindowAverageApp.main(masterConfig, args)
+      }
+
+      masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+      masterReceiver.reply(SubmitApplicationResult(Success(0)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
new file mode 100644
index 0000000..cdc8cb2
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.state.api.PersistentTask
+import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+
+class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
+
+  property("CountProcessor should update state") {
+
+    val taskContext = MockUtil.mockTaskContext
+
+    implicit val system = ActorSystem("test")
+
+    val longGen = Gen.chooseNum[Long](1, 1000)
+    forAll(longGen) {
+      (num: Long) =>
+
+        val conf = UserConfig.empty
+          .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+          .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
+          .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+            new InMemoryCheckpointStoreFactory)
+
+        val count = new CountProcessor(taskContext, conf)
+
+        val appMaster = TestProbe()(system)
+        when(taskContext.appMaster).thenReturn(appMaster.ref)
+
+        count.onStart(StartTime(0L))
+        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
+
+        for (i <- 0L to num) {
+          count.onNext(Message("", i))
+          count.state.get shouldBe Some(i + 1)
+        }
+        // Next checkpoint time is not arrived yet
+        when(taskContext.upstreamMinClock).thenReturn(0L)
+        count.onNext(PersistentTask.CHECKPOINT)
+        appMaster.expectNoMsg(10.milliseconds)
+
+        // Time to checkpoint
+        when(taskContext.upstreamMinClock).thenReturn(num)
+        count.onNext(PersistentTask.CHECKPOINT)
+        // Only the state before checkpoint time is checkpointed
+        appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
+    }
+
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
new file mode 100644
index 0000000..2268994
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import org.mockito.Mockito._
+import org.mockito.{Matchers => MockitoMatchers}
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
+  "NumberGeneratorProcessor" should {
+    "send random numbers" in {
+
+      val taskContext = MockUtil.mockTaskContext
+
+      implicit val system = ActorSystem("test")
+
+      val mockTaskActor = TestProbe()
+
+      // Mock self ActorRef
+      when(taskContext.self).thenReturn(mockTaskActor.ref)
+
+      val conf = UserConfig.empty
+      val genNum = new NumberGeneratorProcessor(taskContext, conf)
+      genNum.onStart(StartTime(0))
+      mockTaskActor.expectMsgType[Message]
+
+      genNum.onNext(Message("next"))
+      verify(taskContext).output(MockitoMatchers.any[Message])
+      // mockTaskActor.expectMsgType[Message]
+
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+}