You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:41 UTC
[31/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
new file mode 100644
index 0000000..796b0d2
--- /dev/null
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.sol
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.Cancellable
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+ import taskContext.output
+
+ val taskConf = taskContext
+
+ private var msgCount: Long = 0
+ private var scheduler: Cancellable = null
+ private var snapShotWordCount: Long = 0
+ private var snapShotTime: Long = 0
+
+ override def onStart(startTime: StartTime): Unit = {
+ scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+ new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount())
+ snapShotTime = System.currentTimeMillis()
+ }
+
+ override def onNext(msg: Message): Unit = {
+ output(msg)
+ msgCount = msgCount + 1
+ }
+
+ override def onStop(): Unit = {
+ if (scheduler != null) {
+ scheduler.cancel()
+ }
+ }
+
+ def reportWordCount(): Unit = {
+ val current: Long = System.currentTimeMillis()
+ LOG.info(s"Task ${taskConf.taskId} " +
+ s"Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) / 1000)} " +
+ s"(words, second)")
+ snapShotWordCount = msgCount
+ snapShotTime = current
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
new file mode 100644
index 0000000..84ed038
--- /dev/null
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.sol
+
+import java.util.Random
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+
+ import taskContext.output
+
+ private val sizeInBytes = conf.getInt(SOLStreamProducer.BYTES_PER_MESSAGE)
+ .getOrElse(DEFAULT_MESSAGE_SIZE)
+ private var messages: Array[String] = null
+ private var rand: Random = null
+ private var messageCount: Long = 0
+
+ override def onStart(startTime: StartTime): Unit = {
+ prepareRandomMessage
+ self ! Start
+ }
+
+ private def prepareRandomMessage = {
+ rand = new Random()
+ val differentMessages = 100
+ messages = new Array(differentMessages)
+
+ 0.until(differentMessages).map { index =>
+ val sb = new StringBuilder(sizeInBytes)
+ // Even though java encodes strings in UCS2, the serialized version sent by the tuples
+ // is UTF8, so it should be a single byte
+ 0.until(sizeInBytes).foldLeft(sb) { (sb, j) =>
+ sb.append(rand.nextInt(9))
+ }
+ messages(index) = sb.toString()
+ }
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val message = messages(rand.nextInt(messages.length))
+ output(new Message(message, System.currentTimeMillis()))
+ messageCount = messageCount + 1L
+ self ! messageSourceMinClock
+ }
+
+ // messageSourceMinClock represent the min clock of the message source
+ private def messageSourceMinClock: Message = {
+ Message("tick", System.currentTimeMillis())
+ }
+}
+
+object SOLStreamProducer {
+ val DEFAULT_MESSAGE_SIZE = 100
+ // Bytes
+ val BYTES_PER_MESSAGE = "bytesPerMessage"
+ val Start = Message("start")
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
deleted file mode 100644
index 6e266d0..0000000
--- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.sol
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class SOLSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
- override def beforeAll {
- startActorSystem()
- }
-
- override def afterAll {
- shutdownActorSystem()
- }
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- property("SOL should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
- val optionalArgs = Array(
- "-streamProducer", "1",
- "-streamProcessor", "1",
- "-bytesPerMessage", "100",
- "-stages", "10")
-
- val args = {
- Table(
- ("requiredArgs", "optionalArgs"),
- (requiredArgs, optionalArgs)
- )
- }
- val masterReceiver = createMockMaster()
- forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
- val args = requiredArgs ++ optionalArgs
-
- Future {
- SOL.main(masterConfig, args)
- }
-
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
deleted file mode 100644
index f5035b4..0000000
--- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.sol
-
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class SOLStreamProcessorSpec extends FlatSpec with Matchers {
-
- it should "pass the message downstream" in {
- val stringGenerator = Gen.alphaStr
- val context = MockUtil.mockTaskContext
-
- val sol = new SOLStreamProcessor(context, UserConfig.empty)
- sol.onStart(StartTime(0))
- val msg = Message("msg")
- sol.onNext(msg)
- verify(context, times(1)).output(msg)
- sol.onStop()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
deleted file mode 100644
index 4bac30c..0000000
--- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.sol
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class SOLStreamProducerSpec extends WordSpec with Matchers {
-
- "SOLStreamProducer" should {
- "producer message continuously" in {
-
- val conf = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, 100)
- val context = MockUtil.mockTaskContext
-
- val producer = new SOLStreamProducer(context, conf)
- producer.onStart(StartTime(0))
- producer.onNext(Message("msg"))
- verify(context).output(any[Message])
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala
new file mode 100644
index 0000000..29e8284
--- /dev/null
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.sol
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class SOLSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
+ override def beforeAll {
+ startActorSystem()
+ }
+
+ override def afterAll {
+ shutdownActorSystem()
+ }
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ property("SOL should succeed to submit application with required arguments") {
+ val requiredArgs = Array.empty[String]
+ val optionalArgs = Array(
+ "-streamProducer", "1",
+ "-streamProcessor", "1",
+ "-bytesPerMessage", "100",
+ "-stages", "10")
+
+ val args = {
+ Table(
+ ("requiredArgs", "optionalArgs"),
+ (requiredArgs, optionalArgs)
+ )
+ }
+ val masterReceiver = createMockMaster()
+ forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+ val args = requiredArgs ++ optionalArgs
+
+ Future {
+ SOL.main(masterConfig, args)
+ }
+
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
new file mode 100644
index 0000000..a6cc966
--- /dev/null
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.sol
+
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class SOLStreamProcessorSpec extends FlatSpec with Matchers {
+
+ it should "pass the message downstream" in {
+ val stringGenerator = Gen.alphaStr
+ val context = MockUtil.mockTaskContext
+
+ val sol = new SOLStreamProcessor(context, UserConfig.empty)
+ sol.onStart(StartTime(0))
+ val msg = Message("msg")
+ sol.onNext(msg)
+ verify(context, times(1)).output(msg)
+ sol.onStop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
new file mode 100644
index 0000000..2316de8
--- /dev/null
+++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.sol
+
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class SOLStreamProducerSpec extends WordSpec with Matchers {
+
+ "SOLStreamProducer" should {
+ "producer message continuously" in {
+
+ val conf = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, 100)
+ val context = MockUtil.mockTaskContext
+
+ val producer = new SOLStreamProducer(context, conf)
+ producer.onStart(StartTime(0))
+ producer.onNext(Message("msg"))
+ verify(context).output(any[Message])
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/resources/state.conf
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/resources/state.conf b/examples/streaming/state/src/main/resources/state.conf
index 7e26d4b..a7eec2b 100644
--- a/examples/streaming/state/src/main/resources/state.conf
+++ b/examples/streaming/state/src/main/resources/state.conf
@@ -1,6 +1,6 @@
state {
checkpoint {
interval = 1000 # milliseconds
- store.factory = io.gearpump.streaming.kafka.KafkaCheckpointStoreFactory
+ store.factory = org.apache.gearpump.streaming.kafka.KafkaCheckpointStoreFactory
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
deleted file mode 100644
index 3c6cde6..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import akka.actor.ActorSystem
-import org.apache.hadoop.conf.Configuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.examples.state.processor.CountProcessor
-import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
-import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
-import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
-import io.gearpump.streaming.state.impl.PersistentStateConfig
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph}
-
-/** Does exactly-once message count */
-object MessageCountApp extends AkkaApp with ArgumentsParser {
- val SOURCE_TASK = "sourceTask"
- val COUNT_TASK = "countTask"
- val SINK_TASK = "sinkTask"
- val SOURCE_TOPIC = "sourceTopic"
- val SINK_TOPIC = "sinkTopic"
- val ZOOKEEPER_CONNECT = "zookeeperConnect"
- val BROKER_LIST = "brokerList"
- val DEFAULT_FS = "defaultFS"
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false,
- defaultValue = Some(1)),
- COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)),
- SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
- defaultValue = Some(1)),
- SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
- SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
- ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
- required = true),
- BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
- DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>",
- required = true)
- )
-
- def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
- val hadoopConfig = new Configuration
- hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS))
- val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig,
- // Rotates on 1KB
- new FileSizeRotation(1000))
- val taskConfig = UserConfig.empty
- .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
- .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
- .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
-
- val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT)
- val brokerList = config.getString(BROKER_LIST)
- val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList)
- val sourceTopic = config.getString(SOURCE_TOPIC)
- val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory)
- val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK))
- val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig)
- val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList)
- val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK))
- val partitioner = new HashPartitioner()
- val graph = Graph(sourceProcessor ~ partitioner
- ~> countProcessor ~ partitioner ~> sinkProcessor)
- val app = StreamApplication("MessageCount", graph, UserConfig.empty)
- app
- }
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
-
- val config = parse(args)
- val context = ClientContext(akkaConf)
- implicit val system = context.system
- val appId = context.submit(application(config))
- context.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
deleted file mode 100644
index 6f3bc79..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import akka.actor.ActorSystem
-import org.apache.hadoop.conf.Configuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor}
-import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
-import io.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig}
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph}
-
-/** Does exactly-once sliding window based average aggregation */
-object WindowAverageApp extends AkkaApp with ArgumentsParser {
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)),
- "window" -> CLIOption("<how mange window tasks", required = false, defaultValue = Some(1)),
- "window_size" -> CLIOption("<window size in milliseconds>", required = false,
- defaultValue = Some(5000)),
- "window_step" -> CLIOption("<window step in milliseconds>", required = false,
- defaultValue = Some(5000))
- )
-
- def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
- val windowSize = config.getInt("window_size")
- val windowStep = config.getInt("window_step")
- val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", new Configuration)
- val taskConfig = UserConfig.empty.
- withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
- .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
- .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
- .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
- val gen = Processor[NumberGeneratorProcessor](config.getInt("gen"))
- val count = Processor[WindowAverageProcessor](config.getInt("window"), taskConf = taskConfig)
- val partitioner = new HashPartitioner()
- val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count),
- UserConfig.empty)
- app
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
- val context = ClientContext(akkaConf)
-
- implicit val system = context.system
- val appId = context.submit(application(config))
- context.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
deleted file mode 100644
index 6610b91..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.monoid.AlgebirdMonoid
-import io.gearpump.streaming.serializer.ChillSerializer
-import io.gearpump.streaming.state.api.{PersistentState, PersistentTask}
-import io.gearpump.streaming.state.impl.NonWindowState
-import io.gearpump.streaming.task.TaskContext
-
-class CountProcessor(taskContext: TaskContext, conf: UserConfig)
- extends PersistentTask[Int](taskContext, conf) {
-
- override def persistentState: PersistentState[Int] = {
- import com.twitter.algebird.Monoid.intMonoid
- new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int])
- }
-
- override def processMessage(state: PersistentState[Int], message: Message): Unit = {
- state.update(message.timestamp, 1)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
deleted file mode 100644
index fa9854f..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
- extends Task(taskContext, conf) {
- import taskContext.output
-
- private var num = 0L
- override def onStart(startTime: StartTime): Unit = {
- num = startTime.startTime
- self ! Message("start")
- }
-
- override def onNext(msg: Message): Unit = {
- output(Message(num + "", num))
- num += 1
-
- import scala.concurrent.duration._
- taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next"))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
deleted file mode 100644
index e7ac9b3..0000000
--- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.collection.immutable.TreeMap
-
-import com.twitter.algebird.{AveragedGroup, AveragedValue}
-import org.slf4j.Logger
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.monoid.AlgebirdGroup
-import io.gearpump.streaming.serializer.ChillSerializer
-import io.gearpump.streaming.state.api.{PersistentState, PersistentTask}
-import io.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, WindowState}
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.util.LogUtil
-
-object WindowAverageProcessor {
- val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor])
-}
-
-class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig)
- extends PersistentTask[AveragedValue](taskContext, conf) {
-
- override def persistentState: PersistentState[AveragedValue] = {
- val group = new AlgebirdGroup(AveragedGroup)
- val serializer = new ChillSerializer[TreeMap[Interval, AveragedValue]]
- val window = new Window(conf.getValue[WindowConfig](WindowConfig.NAME).get)
- new WindowState[AveragedValue](group, serializer, taskContext, window)
- }
-
- override def processMessage(state: PersistentState[AveragedValue],
- message: Message): Unit = {
- val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
- state.update(message.timestamp, value)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
new file mode 100644
index 0000000..13bef0d
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import akka.actor.ActorSystem
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.examples.state.processor.CountProcessor
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory}
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** Does exactly-once message count */
+object MessageCountApp extends AkkaApp with ArgumentsParser {
+ val SOURCE_TASK = "sourceTask"
+ val COUNT_TASK = "countTask"
+ val SINK_TASK = "sinkTask"
+ val SOURCE_TOPIC = "sourceTopic"
+ val SINK_TOPIC = "sinkTopic"
+ val ZOOKEEPER_CONNECT = "zookeeperConnect"
+ val BROKER_LIST = "brokerList"
+ val DEFAULT_FS = "defaultFS"
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false,
+ defaultValue = Some(1)),
+ COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)),
+ SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
+ defaultValue = Some(1)),
+ SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
+ SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
+ ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
+ required = true),
+ BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
+ DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>",
+ required = true)
+ )
+
+ def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
+ val hadoopConfig = new Configuration
+ hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS))
+ val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig,
+ // Rotates on 1KB
+ new FileSizeRotation(1000))
+ val taskConfig = UserConfig.empty
+ .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+ .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+ .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
+
+ val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT)
+ val brokerList = config.getString(BROKER_LIST)
+ val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList)
+ val sourceTopic = config.getString(SOURCE_TOPIC)
+ val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory)
+ val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK))
+ val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig)
+ val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList)
+ val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK))
+ val partitioner = new HashPartitioner()
+ val graph = Graph(sourceProcessor ~ partitioner
+ ~> countProcessor ~ partitioner ~> sinkProcessor)
+ val app = StreamApplication("MessageCount", graph, UserConfig.empty)
+ app
+ }
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ val config = parse(args)
+ val context = ClientContext(akkaConf)
+ implicit val system = context.system
+ val appId = context.submit(application(config))
+ context.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
new file mode 100644
index 0000000..629deb7
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import akka.actor.ActorSystem
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig}
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** Does exactly-once sliding window based average aggregation */
+object WindowAverageApp extends AkkaApp with ArgumentsParser {
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)),
+ "window" -> CLIOption("<how mange window tasks", required = false, defaultValue = Some(1)),
+ "window_size" -> CLIOption("<window size in milliseconds>", required = false,
+ defaultValue = Some(5000)),
+ "window_step" -> CLIOption("<window step in milliseconds>", required = false,
+ defaultValue = Some(5000))
+ )
+
+ def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = {
+ val windowSize = config.getInt("window_size")
+ val windowStep = config.getInt("window_step")
+ val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", new Configuration)
+ val taskConfig = UserConfig.empty.
+ withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+ .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+ .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory)
+ .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
+ val gen = Processor[NumberGeneratorProcessor](config.getInt("gen"))
+ val count = Processor[WindowAverageProcessor](config.getInt("window"), taskConf = taskConfig)
+ val partitioner = new HashPartitioner()
+ val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count),
+ UserConfig.empty)
+ app
+ }
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ val context = ClientContext(akkaConf)
+
+ implicit val system = context.system
+ val appId = context.submit(application(config))
+ context.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
new file mode 100644
index 0000000..9650a0a
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.monoid.AlgebirdMonoid
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+import org.apache.gearpump.streaming.state.api.{PersistentState, PersistentTask}
+import org.apache.gearpump.streaming.state.impl.NonWindowState
+import org.apache.gearpump.streaming.task.TaskContext
+
+class CountProcessor(taskContext: TaskContext, conf: UserConfig)
+ extends PersistentTask[Int](taskContext, conf) {
+
+ override def persistentState: PersistentState[Int] = {
+ import com.twitter.algebird.Monoid.intMonoid
+ new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int])
+ }
+
+ override def processMessage(state: PersistentState[Int], message: Message): Unit = {
+ state.update(message.timestamp, 1)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
new file mode 100644
index 0000000..0e85f32
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+ import taskContext.output
+
+ private var num = 0L
+ override def onStart(startTime: StartTime): Unit = {
+ num = startTime.startTime
+ self ! Message("start")
+ }
+
+ override def onNext(msg: Message): Unit = {
+ output(Message(num + "", num))
+ num += 1
+
+ import scala.concurrent.duration._
+ taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
new file mode 100644
index 0000000..eea2504
--- /dev/null
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import scala.collection.immutable.TreeMap
+
+import com.twitter.algebird.{AveragedGroup, AveragedValue}
+import org.slf4j.Logger
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.monoid.AlgebirdGroup
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+import org.apache.gearpump.streaming.state.api.{PersistentState, PersistentTask}
+import org.apache.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, WindowState}
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+
+object WindowAverageProcessor {
+ val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor])
+}
+
+class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig)
+ extends PersistentTask[AveragedValue](taskContext, conf) {
+
+ override def persistentState: PersistentState[AveragedValue] = {
+ val group = new AlgebirdGroup(AveragedGroup)
+ val serializer = new ChillSerializer[TreeMap[Interval, AveragedValue]]
+ val window = new Window(conf.getValue[WindowConfig](WindowConfig.NAME).get)
+ new WindowState[AveragedValue](group, serializer, taskContext, window)
+ }
+
+ override def processMessage(state: PersistentState[AveragedValue],
+ message: Message): Unit = {
+ val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
+ state.update(message.timestamp, value)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
deleted file mode 100644
index 040c343..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.streaming.examples.state.MessageCountApp._
-
-class MessageCountAppSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
- before {
- startActorSystem()
- }
-
- after {
- shutdownActorSystem()
- }
-
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- property("MessageCount should succeed to submit application with required arguments") {
- val requiredArgs = Array(
- s"-$SOURCE_TOPIC", "source",
- s"-$SINK_TOPIC", "sink",
- s"-$ZOOKEEPER_CONNECT", "localhost:2181",
- s"-$BROKER_LIST", "localhost:9092",
- s"-$DEFAULT_FS", "hdfs://localhost:9000"
- )
- val optionalArgs = Array(
- s"-$SOURCE_TASK", "2",
- s"-$COUNT_TASK", "2",
- s"-$SINK_TASK", "2"
- )
-
- val args = {
- Table(
- ("requiredArgs", "optionalArgs"),
- (requiredArgs, optionalArgs.take(0)),
- (requiredArgs, optionalArgs.take(2)),
- (requiredArgs, optionalArgs.take(4)),
- (requiredArgs, optionalArgs)
- )
- }
-
- val masterReceiver = createMockMaster()
- forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
- val args = requiredArgs ++ optionalArgs
- Future {
- MessageCountApp.main(masterConfig, args)
- }
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
deleted file mode 100644
index 7c1c798..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class WindowAverageAppSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
- before {
- startActorSystem()
- }
-
- after {
- shutdownActorSystem()
- }
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- property("WindowAverage should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
- val optionalArgs = Array(
- "-gen", "2",
- "-window", "2",
- "-window_size", "5000",
- "-window_step", "5000"
- )
-
- val args = {
- Table(
- ("requiredArgs", "optionalArgs"),
- (requiredArgs, optionalArgs.take(0)),
- (requiredArgs, optionalArgs.take(2)),
- (requiredArgs, optionalArgs.take(4)),
- (requiredArgs, optionalArgs.take(6)),
- (requiredArgs, optionalArgs)
- )
- }
- val masterReceiver = createMockMaster()
- forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
- val args = requiredArgs ++ optionalArgs
-
- Future {
- WindowAverageApp.main(masterConfig, args)
- }
-
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
deleted file mode 100644
index a69116b..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.PersistentTask
-import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
-import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
-import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
-
-class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
-
- property("CountProcessor should update state") {
-
- val taskContext = MockUtil.mockTaskContext
-
- implicit val system = ActorSystem("test")
-
- val longGen = Gen.chooseNum[Long](1, 1000)
- forAll(longGen) {
- (num: Long) =>
-
- val conf = UserConfig.empty
- .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
- .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
- .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
- new InMemoryCheckpointStoreFactory)
-
- val count = new CountProcessor(taskContext, conf)
-
- val appMaster = TestProbe()(system)
- when(taskContext.appMaster).thenReturn(appMaster.ref)
-
- count.onStart(StartTime(0L))
- appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
-
- for (i <- 0L to num) {
- count.onNext(Message("", i))
- count.state.get shouldBe Some(i + 1)
- }
- // Next checkpoint time is not arrived yet
- when(taskContext.upstreamMinClock).thenReturn(0L)
- count.onNext(PersistentTask.CHECKPOINT)
- appMaster.expectNoMsg(10.milliseconds)
-
- // Time to checkpoint
- when(taskContext.upstreamMinClock).thenReturn(num)
- count.onNext(PersistentTask.CHECKPOINT)
- // Only the state before checkpoint time is checkpointed
- appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
- }
-
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
deleted file mode 100644
index 18a49ac..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import org.mockito.Mockito._
-import org.mockito.{Matchers => MockitoMatchers}
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
- "NumberGeneratorProcessor" should {
- "send random numbers" in {
-
- val taskContext = MockUtil.mockTaskContext
-
- implicit val system = ActorSystem("test")
-
- val mockTaskActor = TestProbe()
-
- // Mock self ActorRef
- when(taskContext.self).thenReturn(mockTaskActor.ref)
-
- val conf = UserConfig.empty
- val genNum = new NumberGeneratorProcessor(taskContext, conf)
- genNum.onStart(StartTime(0))
- mockTaskActor.expectMsgType[Message]
-
- genNum.onNext(Message("next"))
- verify(taskContext).output(MockitoMatchers.any[Message])
- // mockTaskActor.expectMsgType[Message]
-
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
deleted file mode 100644
index a9c52aa..0000000
--- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.state.processor
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import com.twitter.algebird.AveragedValue
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.PersistentTask
-import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
-import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
-import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
-
-class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers {
- property("WindowAverageProcessor should update state") {
-
- implicit val system = ActorSystem("test")
- val longGen = Gen.chooseNum[Long](1, 1000)
- forAll(longGen, longGen) {
- (data: Long, num: Long) =>
- val taskContext = MockUtil.mockTaskContext
-
- val windowSize = num
- val windowStep = num
-
- val conf = UserConfig.empty
- .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
- .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
- .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
- new InMemoryCheckpointStoreFactory)
- .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
-
- val windowAverage = new WindowAverageProcessor(taskContext, conf)
-
- val appMaster = TestProbe()(system)
- when(taskContext.appMaster).thenReturn(appMaster.ref)
-
- windowAverage.onStart(StartTime(0L))
- appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
-
- for (i <- 0L until num) {
- windowAverage.onNext(Message("" + data, i))
- windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data))
- }
-
- // Next checkpoint time is not arrived yet
- when(taskContext.upstreamMinClock).thenReturn(0L)
- windowAverage.onNext(PersistentTask.CHECKPOINT)
- appMaster.expectNoMsg(10.milliseconds)
-
- // Time to checkpoint
- when(taskContext.upstreamMinClock).thenReturn(num)
- windowAverage.onNext(PersistentTask.CHECKPOINT)
- appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
- }
-
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
new file mode 100644
index 0000000..729994e
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.streaming.examples.state.MessageCountApp._
+
+class MessageCountAppSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+ before {
+ startActorSystem()
+ }
+
+ after {
+ shutdownActorSystem()
+ }
+
+ protected override def config = TestUtil.DEFAULT_CONFIG
+
+ property("MessageCount should succeed to submit application with required arguments") {
+ val requiredArgs = Array(
+ s"-$SOURCE_TOPIC", "source",
+ s"-$SINK_TOPIC", "sink",
+ s"-$ZOOKEEPER_CONNECT", "localhost:2181",
+ s"-$BROKER_LIST", "localhost:9092",
+ s"-$DEFAULT_FS", "hdfs://localhost:9000"
+ )
+ val optionalArgs = Array(
+ s"-$SOURCE_TASK", "2",
+ s"-$COUNT_TASK", "2",
+ s"-$SINK_TASK", "2"
+ )
+
+ val args = {
+ Table(
+ ("requiredArgs", "optionalArgs"),
+ (requiredArgs, optionalArgs.take(0)),
+ (requiredArgs, optionalArgs.take(2)),
+ (requiredArgs, optionalArgs.take(4)),
+ (requiredArgs, optionalArgs)
+ )
+ }
+
+ val masterReceiver = createMockMaster()
+ forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+ val args = requiredArgs ++ optionalArgs
+ Future {
+ MessageCountApp.main(masterConfig, args)
+ }
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
new file mode 100644
index 0000000..00cb290
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class WindowAverageAppSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+ before {
+ startActorSystem()
+ }
+
+ after {
+ shutdownActorSystem()
+ }
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ property("WindowAverage should succeed to submit application with required arguments") {
+ val requiredArgs = Array.empty[String]
+ val optionalArgs = Array(
+ "-gen", "2",
+ "-window", "2",
+ "-window_size", "5000",
+ "-window_step", "5000"
+ )
+
+ val args = {
+ Table(
+ ("requiredArgs", "optionalArgs"),
+ (requiredArgs, optionalArgs.take(0)),
+ (requiredArgs, optionalArgs.take(2)),
+ (requiredArgs, optionalArgs.take(4)),
+ (requiredArgs, optionalArgs.take(6)),
+ (requiredArgs, optionalArgs)
+ )
+ }
+ val masterReceiver = createMockMaster()
+ forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+ val args = requiredArgs ++ optionalArgs
+
+ Future {
+ WindowAverageApp.main(masterConfig, args)
+ }
+
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
new file mode 100644
index 0000000..cdc8cb2
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.state.api.PersistentTask
+import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+
+class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
+
+ property("CountProcessor should update state") {
+
+ val taskContext = MockUtil.mockTaskContext
+
+ implicit val system = ActorSystem("test")
+
+ val longGen = Gen.chooseNum[Long](1, 1000)
+ forAll(longGen) {
+ (num: Long) =>
+
+ val conf = UserConfig.empty
+ .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+ .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
+ .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+ new InMemoryCheckpointStoreFactory)
+
+ val count = new CountProcessor(taskContext, conf)
+
+ val appMaster = TestProbe()(system)
+ when(taskContext.appMaster).thenReturn(appMaster.ref)
+
+ count.onStart(StartTime(0L))
+ appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
+
+ for (i <- 0L to num) {
+ count.onNext(Message("", i))
+ count.state.get shouldBe Some(i + 1)
+ }
+ // Next checkpoint time is not arrived yet
+ when(taskContext.upstreamMinClock).thenReturn(0L)
+ count.onNext(PersistentTask.CHECKPOINT)
+ appMaster.expectNoMsg(10.milliseconds)
+
+ // Time to checkpoint
+ when(taskContext.upstreamMinClock).thenReturn(num)
+ count.onNext(PersistentTask.CHECKPOINT)
+ // Only the state before checkpoint time is checkpointed
+ appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
+ }
+
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
new file mode 100644
index 0000000..2268994
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.processor
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import org.mockito.Mockito._
+import org.mockito.{Matchers => MockitoMatchers}
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+
+class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
+ "NumberGeneratorProcessor" should {
+ "send random numbers" in {
+
+ val taskContext = MockUtil.mockTaskContext
+
+ implicit val system = ActorSystem("test")
+
+ val mockTaskActor = TestProbe()
+
+ // Mock self ActorRef
+ when(taskContext.self).thenReturn(mockTaskActor.ref)
+
+ val conf = UserConfig.empty
+ val genNum = new NumberGeneratorProcessor(taskContext, conf)
+ genNum.onStart(StartTime(0))
+ mockTaskActor.expectMsgType[Message]
+
+ genNum.onNext(Message("next"))
+ verify(taskContext).output(MockitoMatchers.any[Message])
+ // mockTaskActor.expectMsgType[Message]
+
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+ }
+}