You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/05/18 06:03:50 UTC
[1/2] incubator-gearpump git commit: [GEARPUMP-312] Add Message trait
and DefaultMessage impl
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 000e846ab -> 176d82763
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index c6817f5..79bcc2a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -19,14 +19,13 @@
package org.apache.gearpump.streaming.task
import org.slf4j.Logger
-
import com.google.common.primitives.Shorts
import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
import org.apache.gearpump.streaming.LifeTime
import org.apache.gearpump.streaming.task.Subscription._
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, TimeStamp}
/**
* Manges the output and message clock for single downstream processor
@@ -103,14 +102,16 @@ class Subscription(
var count = 0
// Only sends message whose timestamp matches the lifeTime
- if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timeInMillis)) {
+ if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(
+ msg.timestamp.toEpochMilli)) {
val targetTask = TaskId(processorId, partition)
transport.transport(msg, targetTask)
- this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timeInMillis)
+ this.minClockValue(partition) = Math.min(this.minClockValue(partition),
+ msg.timestamp.toEpochMilli)
this.candidateMinClock(partition) =
- Math.min(this.candidateMinClock(partition), msg.timeInMillis)
+ Math.min(this.candidateMinClock(partition), msg.timestamp.toEpochMilli)
incrementMessageCount(partition, 1)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index 5b174bd..90a8bff 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -24,7 +24,6 @@ import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor.Receive
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import org.slf4j.Logger
-
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.util.LogUtil
import org.apache.gearpump.{Message, TimeStamp}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 10648b4..8ef45f3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -35,7 +35,7 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.streaming.task.TaskActor._
import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, TimeStamp}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index 1b3f30c..f5f099c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -26,7 +26,7 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import org.slf4j.Logger
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{TimeStamp, Message}
+import org.apache.gearpump.{Message, TimeStamp}
/**
* This provides TaskContext for user defined tasks
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index bcf96e4..c223a53 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, Proce
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.TimeStamp
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index a9b23fe..f5d7c20 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -268,7 +268,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
source.onWatermarkProgress(Watermark.MAX)
data.foreach { s =>
verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
- message => message.msg == s))
+ message => message.value == s))
}
// Source with transformer
@@ -282,7 +282,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
another.onWatermarkProgress(Watermark.MAX)
data.foreach { s =>
verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
- message => message.msg == s))
+ message => message.value == s))
}
}
}
@@ -317,7 +317,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
import scala.collection.JavaConverters._
val values = peopleCaptor.getAllValues.asScala.map(input =>
- input.msg.asInstanceOf[Option[String]].get)
+ input.value.asInstanceOf[Option[String]].get)
assert(values.mkString(",") == "1,2,22,3,33,333")
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index 62a3bcb..fb398b8 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -115,10 +115,10 @@ object StreamSpec {
var query: String = _
override def onNext(msg: Message): Unit = {
- msg.msg match {
+ msg.value match {
case Left(wordCount: (String @unchecked, Int @unchecked)) =>
if (query != null && wordCount._1 == query) {
- taskContext.output(new Message(wordCount))
+ taskContext.output(Message(wordCount))
}
case Right(query: String) =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index f0bccd7..281d69a 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -64,12 +64,12 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with
msgs.foreach { msg =>
runner.foreach(r =>
- when(r.process(msg.msg)).thenReturn(Some(msg.msg)))
+ when(r.process(msg.value)).thenReturn(Some(msg.value)))
}
task.onWatermarkProgress(Watermark.MAX)
msgs.foreach { msg =>
- verify(taskContext).output(MockitoMatchers.eq(Message(msg.msg, Watermark.MAX)))
+ verify(taskContext).output(MockitoMatchers.eq(Message(msg.value, Watermark.MAX)))
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 9f02cef..fb0beaa 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -66,13 +66,13 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
it should "send message and handle ack correctly" in {
val (subscription, transport) = prepare
- val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
+ val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
subscription.sendMessage(msg1)
verify(transport, times(1)).transport(msg1, TaskId(1, 1))
assert(subscription.minClock == 70)
- val msg2 = new Message("0", timestamp = Instant.ofEpochMilli(50))
+ val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
subscription.sendMessage(msg2)
verify(transport, times(1)).transport(msg2, TaskId(1, 0))
@@ -120,7 +120,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
it should "report minClock as Long.MaxValue when there is no pending message" in {
val (subscription, _) = prepare
- val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
+ val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
subscription.sendMessage(msg1)
assert(subscription.minClock == 70)
subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
[2/2] incubator-gearpump git commit: [GEARPUMP-312] Add Message trait
and DefaultMessage impl
Posted by ma...@apache.org.
[GEARPUMP-312] Add Message trait and DefaultMessage impl
Author: manuzhang <ow...@gmail.com>
Closes #183 from manuzhang/message_trait.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/176d8276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/176d8276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/176d8276
Branch: refs/heads/master
Commit: 176d82763550ff59b65038c264c63d6b11951996
Parents: 000e846
Author: manuzhang <ow...@gmail.com>
Authored: Thu May 18 14:02:51 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu May 18 14:03:34 2017 +0800
----------------------------------------------------------------------
.../scala/org/apache/gearpump/Message.scala | 47 ++++++++----
.../streaming/examples/complexdag/Node.scala | 4 +-
.../streaming/examples/complexdag/Sink.scala | 2 +-
.../streaming/examples/complexdag/Source.scala | 2 +-
.../examples/complexdag/NodeSpec.scala | 2 +-
.../examples/complexdag/SourceSpec.scala | 3 +-
.../examples/fsio/SeqFileStreamProcessor.scala | 2 +-
.../fsio/SeqFileStreamProducerSpec.scala | 2 +-
.../examples/kafka/wordcount/Split.scala | 4 +-
.../examples/kafka/wordcount/Sum.scala | 4 +-
.../examples/sol/SOLStreamProcessor.scala | 3 +-
.../examples/sol/SOLStreamProducer.scala | 2 +-
.../state/processor/CountProcessor.scala | 3 +-
.../processor/WindowAverageProcessor.scala | 4 +-
.../state/DefaultMessageCountAppSpec.scala | 79 ++++++++++++++++++++
.../examples/state/MessageCountAppSpec.scala | 79 --------------------
.../state/processor/CountProcessorSpec.scala | 1 -
.../streaming/examples/wordcountjava/Split.java | 3 +-
.../streaming/examples/wordcountjava/Sum.java | 4 +-
.../examples/wordcountjava/dsl/WordCount.java | 3 +-
.../streaming/examples/wordcount/Sum.scala | 7 +-
.../gearpump/akkastream/task/BatchTask.scala | 9 +--
.../gearpump/akkastream/task/ConcatTask.scala | 2 +-
.../akkastream/task/DelayInitialTask.scala | 5 +-
.../akkastream/task/DropWithinTask.scala | 4 +-
.../akkastream/task/FlattenMergeTask.scala | 2 +-
.../gearpump/akkastream/task/FoldTask.scala | 6 +-
.../akkastream/task/GroupedWithinTask.scala | 2 +-
.../akkastream/task/InterleaveTask.scala | 2 +-
.../gearpump/akkastream/task/MapAsyncTask.scala | 6 +-
.../gearpump/akkastream/task/MergeTask.scala | 2 +-
.../akkastream/task/SingleSourceTask.scala | 2 +-
.../akkastream/task/SinkBridgeTask.scala | 4 +-
.../akkastream/task/SourceBridgeTask.scala | 2 +-
.../akkastream/task/StatefulMapConcatTask.scala | 4 +-
.../akkastream/task/TakeWithinTask.scala | 4 +-
.../gearpump/akkastream/task/ThrottleTask.scala | 5 +-
.../akkastream/task/TickSourceTask.scala | 4 +-
.../gearpump/akkastream/task/Unzip2Task.scala | 6 +-
.../gearpump/akkastream/task/Zip2Task.scala | 4 +-
.../experimental/rabbitmq/RMQSink.scala | 2 +-
.../org/apache/gearpump/redis/RedisSink.scala | 2 +-
.../storm/partitioner/StormPartitioner.scala | 2 +-
.../storm/processor/StormProcessor.scala | 2 +-
.../storm/topology/GearpumpStormComponent.scala | 4 +-
.../partitioner/StormPartitionerSpec.scala | 3 +-
.../storm/util/StormOutputCollectorSpec.scala | 13 ++--
.../streaming/hadoop/SequenceFileSink.scala | 1 -
.../lib/format/DefaultSequenceFormatter.scala | 3 +-
.../hadoop/lib/format/OutputFormatter.scala | 1 -
.../gearpump/external/hbase/HBaseSink.scala | 2 +-
.../kafka/lib/sink/AbstractKafkaSink.scala | 2 +-
.../kafka/lib/source/AbstractKafkaSource.scala | 3 +-
.../source/DefaultKafkaMessageDecoderSpec.scala | 4 +-
.../dsl/partitioner/GroupByPartitioner.scala | 2 +-
.../streaming/dsl/scalaapi/Stream.scala | 2 +-
.../streaming/dsl/task/TransformTask.scala | 23 +++---
.../streaming/dsl/window/impl/Window.scala | 2 +-
.../dsl/window/impl/WindowRunner.scala | 2 +-
.../partitioner/BroadcastPartitioner.scala | 4 +-
.../streaming/partitioner/HashPartitioner.scala | 2 +-
.../gearpump/streaming/source/Watermark.scala | 3 +-
.../streaming/state/api/PersistentTask.scala | 4 +-
.../streaming/task/ExpressTransport.scala | 5 +-
.../gearpump/streaming/task/Subscription.scala | 11 +--
.../apache/gearpump/streaming/task/Task.scala | 1 -
.../gearpump/streaming/task/TaskActor.scala | 2 +-
.../gearpump/streaming/task/TaskWrapper.scala | 2 +-
.../streaming/appmaster/TaskManagerSpec.scala | 2 +-
.../dsl/plan/functions/FunctionRunnerSpec.scala | 6 +-
.../streaming/dsl/scalaapi/StreamSpec.scala | 4 +-
.../streaming/dsl/task/TransformTaskSpec.scala | 4 +-
.../streaming/task/SubscriptionSpec.scala | 6 +-
73 files changed, 233 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala
index 9965565..4dc5c09 100644
--- a/core/src/main/scala/org/apache/gearpump/Message.scala
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -20,34 +20,41 @@ package org.apache.gearpump
import java.time.Instant
+trait Message {
+
+ val value: Any
+
+ val timestamp: Instant
+}
+
/**
* Each message contains an immutable timestamp.
*
* For example, if you take a picture, the time you take the picture is the
* message's timestamp.
*
- * @param msg Accept any type except Null, Nothing and Unit
+ * @param value Accept any type except Null, Nothing and Unit
*/
-case class Message(msg: Any, timeInMillis: TimeStamp) {
+case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message {
/**
- * @param msg Accept any type except Null, Nothing and Unit
+ * @param value Accept any type except Null, Nothing and Unit
* @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue)
*/
- def this(msg: Any, timestamp: Instant) = {
- this(msg, timestamp.toEpochMilli)
+ def this(value: Any, timestamp: Instant) = {
+ this(value, timestamp.toEpochMilli)
}
/**
* Instant.EPOCH is used for default timestamp
*
- * @param msg Accept any type except Null, Nothing and Uni
+ * @param value Accept any type except Null, Nothing and Uni
*/
- def this(msg: Any) = {
- this(msg, Instant.EPOCH)
+ def this(value: Any) = {
+ this(value, Instant.EPOCH)
}
- def timestamp: Instant = {
+ override val timestamp: Instant = {
Instant.ofEpochMilli(timeInMillis)
}
}
@@ -57,17 +64,25 @@ object Message {
/**
* Instant.EPOCH is used for default timestamp
*
- * @param msg Accept any type except Null, Nothing and Uni
+ * @param value Accept any type except Null, Nothing and Unit
*/
- def apply(msg: Any): Message = {
- new Message(msg)
+ def apply(value: Any): Message = {
+ new DefaultMessage(value)
}
/**
- * @param msg Accept any type except Null, Nothing and Unit
- * @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue)
+ * @param value Accept any type except Null, Nothing and Unit
+ * @param timestamp timestamp must be smaller than Long.MaxValue
+ */
+ def apply(value: Any, timestamp: TimeStamp): Message = {
+ DefaultMessage(value, timestamp)
+ }
+
+ /**
+ * @param value Accept any type except Null, Nothing and Unit
+ * @param timestamp timestamp must be smaller than Instant.ofEpochMilli(Long.MaxValue)
*/
- def apply(msg: Any, timestamp: Instant): Message = {
- new Message(msg, timestamp)
+ def apply(value: Any, timestamp: Instant): Message = {
+ new DefaultMessage(value, timestamp)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
index ddd4d1a..dbc0efa 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -30,7 +30,7 @@ class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
override def onStart(startTime: Instant): Unit = {}
override def onNext(msg: Message): Unit = {
- val list = msg.msg.asInstanceOf[Vector[String]]
- output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
+ val list = msg.value.asInstanceOf[Vector[String]]
+ output(Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
index e9b00a0..8cc23ff 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -35,7 +35,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
}
override def onNext(msg: Message): Unit = {
- val l = msg.msg.asInstanceOf[Vector[String]]
+ val l = msg.value.asInstanceOf[Vector[String]]
list.size match {
case 1 =>
l.foreach(f => {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
index 074b389..e3bc29a 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -35,7 +35,7 @@ class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContex
override def onNext(msg: Message): Unit = {
val list = Vector(getClass.getCanonicalName)
val now = Instant.now
- output(new Message(list, now.toEpochMilli))
+ output(Message(list, now.toEpochMilli))
self ! Watermark(now)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
index 241e0f6..f5376d5 100644
--- a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
@@ -36,6 +36,6 @@ class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAnd
val list = Vector(classOf[Node].getCanonicalName)
val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
node.onNext(Message(list))
- verify(context).output(argMatch[Message](_.msg == expected))
+ verify(context).output(argMatch[Message](_.value == expected))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
index 20cad1c..f445566 100644
--- a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
@@ -38,7 +38,8 @@ class SourceSpec extends WordSpec with Matchers {
val source = new Source(context, UserConfig.empty)
source.onNext(Message("start"))
- verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg))
+ verify(context).output(argMatch[Message](
+ Vector(classOf[Source].getCanonicalName) == _.value))
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 561346e..229f073 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -63,7 +63,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
}
override def onNext(msg: Message): Unit = {
- val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
+ val kv = msg.value.asInstanceOf[String].split("\\+\\+")
if (kv.length >= 2) {
key.set(kv(0))
value.set(kv(1))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index a03e68d..215cbfd 100644
--- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -79,7 +79,7 @@ class SeqFileStreamProducerSpec
val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
verify(context).output(argMatch[Message](msg =>
- expected.contains(msg.msg.asInstanceOf[String])))
+ expected.contains(msg.value.asInstanceOf[String])))
}
after {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
index b78e788..4250a43 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -32,8 +32,8 @@ class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext
}
override def onNext(msg: Message): Unit = {
- Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]])
+ Injection.invert[String, Array[Byte]](msg.value.asInstanceOf[Array[Byte]])
.foreach(_.split("\\s+").foreach(
- word => output(new Message(word, msg.timestamp))))
+ word => output(Message(word, msg.timestamp))))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 58bb884..d1a98d0 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -33,10 +33,10 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
override def onStart(startTime: Instant): Unit = {}
override def onNext(message: Message): Unit = {
- val word = message.msg.asInstanceOf[String]
+ val word = message.value.asInstanceOf[String]
val count = wordcount.getOrElse(word, 0L) + 1
wordcount += word -> count
- output(new Message(
+ output(Message(
Injection[String, Array[Byte]](word) ->
Injection[Long, Array[Byte]](count),
message.timestamp))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
index a16cf4c..9fb581d 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala
@@ -20,10 +20,9 @@ package org.apache.gearpump.streaming.examples.sol
import java.time.Instant
import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.FiniteDuration
import akka.actor.Cancellable
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.{Task, TaskContext}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index 2b443e5..973a84e 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -61,7 +61,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
override def onNext(msg: Message): Unit = {
val message = messages(rand.nextInt(messages.length))
- output(new Message(message, System.currentTimeMillis()))
+ output(Message(message, System.currentTimeMillis()))
messageCount = messageCount + 1L
self ! Watermark(Instant.now)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
index 4efc6e1..7f4bc22 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -38,7 +38,8 @@ class CountProcessor(taskContext: TaskContext, conf: UserConfig)
override def processMessage(state: PersistentState[Int], message: Message): Unit = {
state.update(message.timestamp.toEpochMilli, 1)
- state.get.foreach(s => taskContext.output(Message(serializer.serialize(s), message.timestamp)))
+ state.get.foreach(s => taskContext.output(
+ Message(serializer.serialize(s), message.timestamp)))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
index 8ddbedd..0052c57 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
@@ -19,10 +19,8 @@
package org.apache.gearpump.streaming.examples.state.processor
import scala.collection.immutable.TreeMap
-
import com.twitter.algebird.{AveragedGroup, AveragedValue}
import org.slf4j.Logger
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.monoid.AlgebirdGroup
@@ -48,7 +46,7 @@ class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig)
override def processMessage(state: PersistentState[AveragedValue],
message: Message): Unit = {
- val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
+ val value = AveragedValue(message.value.asInstanceOf[String].toLong)
state.update(message.timestamp.toEpochMilli, value)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala
new file mode 100644
index 0000000..619e5d4
--- /dev/null
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.streaming.examples.state.MessageCountApp._
+
+class DefaultMessageCountAppSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+ before {
+ startActorSystem()
+ }
+
+ after {
+ shutdownActorSystem()
+ }
+
+ protected override def config = TestUtil.DEFAULT_CONFIG
+
+ property("MessageCount should succeed to submit application with required arguments") {
+ val requiredArgs = Array(
+ s"-$SOURCE_TOPIC", "source",
+ s"-$SINK_TOPIC", "sink",
+ s"-$ZOOKEEPER_CONNECT", "localhost:2181",
+ s"-$BROKER_LIST", "localhost:9092",
+ s"-$DEFAULT_FS", "hdfs://localhost:9000"
+ )
+ val optionalArgs = Array(
+ s"-$SOURCE_TASK", "2",
+ s"-$COUNT_TASK", "2",
+ s"-$SINK_TASK", "2"
+ )
+
+ val args = {
+ Table(
+ ("requiredArgs", "optionalArgs"),
+ (requiredArgs, optionalArgs.take(0)),
+ (requiredArgs, optionalArgs.take(2)),
+ (requiredArgs, optionalArgs.take(4)),
+ (requiredArgs, optionalArgs)
+ )
+ }
+
+ val masterReceiver = createMockMaster()
+ forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
+ val args = requiredArgs ++ optionalArgs
+ Future {
+ MessageCountApp.main(masterConfig, args)
+ }
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
deleted file mode 100644
index 729994e..0000000
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.examples.state
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
-import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.streaming.examples.state.MessageCountApp._
-
-class MessageCountAppSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
- before {
- startActorSystem()
- }
-
- after {
- shutdownActorSystem()
- }
-
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- property("MessageCount should succeed to submit application with required arguments") {
- val requiredArgs = Array(
- s"-$SOURCE_TOPIC", "source",
- s"-$SINK_TOPIC", "sink",
- s"-$ZOOKEEPER_CONNECT", "localhost:2181",
- s"-$BROKER_LIST", "localhost:9092",
- s"-$DEFAULT_FS", "hdfs://localhost:9000"
- )
- val optionalArgs = Array(
- s"-$SOURCE_TASK", "2",
- s"-$COUNT_TASK", "2",
- s"-$SINK_TASK", "2"
- )
-
- val args = {
- Table(
- ("requiredArgs", "optionalArgs"),
- (requiredArgs, optionalArgs.take(0)),
- (requiredArgs, optionalArgs.take(2)),
- (requiredArgs, optionalArgs.take(4)),
- (requiredArgs, optionalArgs)
- )
- }
-
- val masterReceiver = createMockMaster()
- forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
- val args = requiredArgs ++ optionalArgs
- Future {
- MessageCountApp.main(masterConfig, args)
- }
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index 5affb5e..158baeb 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -33,7 +33,6 @@ import org.scalatest.{Matchers, PropSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.state.api.PersistentTask
import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig}
import org.apache.gearpump.streaming.task.UpdateCheckpointClock
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
index a0996b3..22425e3 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -18,6 +18,7 @@
package org.apache.gearpump.streaming.examples.wordcountjava;
+import org.apache.gearpump.DefaultMessage;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.javaapi.Task;
@@ -45,7 +46,7 @@ public class Split extends Task {
// Split the TEXT to words
String[] words = TEXT.split(" ");
for (int i = 0; i < words.length; i++) {
- context.output(new Message(words[i], Instant.now().toEpochMilli()));
+ context.output(DefaultMessage.apply(words[i], Instant.now().toEpochMilli()));
}
self().tell(new Watermark(Instant.now()), self());
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
index 3daa6e0..41cdbdc 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java
@@ -42,8 +42,8 @@ public class Sum extends Task {
}
@Override
- public void onNext(Message messagePayLoad) {
- String word = (String) (messagePayLoad.msg());
+ public void onNext(Message message) {
+ String word = (String) (message.value());
Integer current = wordCount.get(word);
if (current == null) {
current = 0;
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index d8262fd..2830b16 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -19,6 +19,7 @@
package org.apache.gearpump.streaming.examples.wordcountjava.dsl;
import com.typesafe.config.Config;
+import org.apache.gearpump.DefaultMessage;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.ClusterConfig;
import org.apache.gearpump.cluster.UserConfig;
@@ -79,7 +80,7 @@ public class WordCount {
@Override
public Message read() {
- return Message.apply(str, Instant.now());
+ return new DefaultMessage(str, Instant.now());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
index dbefc93..6f482fa 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala
@@ -20,11 +20,10 @@ package org.apache.gearpump.streaming.examples.wordcount
import java.time.Instant
import java.util.concurrent.TimeUnit
+
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
-
import akka.actor.Cancellable
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -45,9 +44,9 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
override def onNext(msg: Message): Unit = {
if (null != msg) {
- val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
+ val current = map.getOrElse(msg.value.asInstanceOf[String], 0L)
wordCount += 1
- map.put(msg.msg.asInstanceOf[String], current + 1)
+ map.put(msg.value.asInstanceOf[String], current + 1)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
index 5c2485b..3d412ff 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
@@ -18,15 +18,10 @@
package org.apache.gearpump.akkastream.task
-import java.util.concurrent.TimeUnit
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-
class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
extends GraphTask(context, userConf) {
@@ -35,8 +30,8 @@ class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE)
val seed = userConf.getValue[In => Out](BatchTask.SEED)
- override def onNext(msg : Message) : Unit = {
- val data = msg.msg.asInstanceOf[In]
+ override def onNext(msg: Message) : Unit = {
+ val data = msg.value.asInstanceOf[In]
val time = msg.timestamp
context.output(msg)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
index b77b9bd..94954c0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
@@ -28,7 +28,7 @@ class ConcatTask(context: TaskContext, userConf : UserConfig)
val sizeOfOutputs = sizeOfOutPorts
var index = 0
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
output(index, msg)
index += 1
if (index == sizeOfOutputs) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
index ae91d1f..602f732 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -25,7 +25,6 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
case object DelayInitialTime
@@ -42,8 +41,8 @@ class DelayInitialTask[T](context: TaskContext, userConf : UserConfig)
self ! Message(DelayInitialTime, Instant.now())
)
}
- override def onNext(msg : Message) : Unit = {
- msg.msg match {
+ override def onNext(msg: Message) : Unit = {
+ msg.value match {
case DelayInitialTime =>
delayInitialActive = false
case _ =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
index 4c19de5..c0756e3 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -42,8 +42,8 @@ class DropWithinTask[T](context: TaskContext, userConf : UserConfig)
)
}
- override def onNext(msg : Message) : Unit = {
- msg.msg match {
+ override def onNext(msg: Message) : Unit = {
+ msg.value match {
case DropWithinTimeout =>
timeoutActive = false
case _ =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
index 14ff537..d3e815a 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
@@ -28,7 +28,7 @@ class FlattenMergeTask(context: TaskContext, userConf : UserConfig)
val sizeOfOutputs = sizeOfOutPorts
var index = 0
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
output(index, msg)
index += 1
if (index == sizeOfOutputs) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
index d982ebd..8de9f8d 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
@@ -38,13 +38,13 @@ class FoldTask[In, Out](context: TaskContext, userConf : UserConfig)
})
}
- override def onNext(msg : Message) : Unit = {
- val data = msg.msg.asInstanceOf[In]
+ override def onNext(msg: Message) : Unit = {
+ val data = msg.value.asInstanceOf[In]
val time = msg.timestamp
aggregator.foreach(func => {
aggregated = func(aggregated, data)
LOG.info(s"aggregated = $aggregated")
- val msg = new Message(aggregated, time)
+ val msg = Message(aggregated, time)
context.output(msg)
})
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
index eaf2b3f..12e2d40 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
@@ -33,7 +33,7 @@ class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig)
val timeWindow = userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW)
val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE)
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
index 741ec43..908e21e 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
@@ -29,7 +29,7 @@ class InterleaveTask(context: TaskContext, userConf : UserConfig)
var index = 0
// TODO access upstream and pull
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
output(index, msg)
index += 1
if (index == sizeOfInputs) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
index daa1afc..c500ba2 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
@@ -30,15 +30,15 @@ class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig)
val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC)
implicit val ec = context.system.dispatcher
- override def onNext(msg : Message) : Unit = {
- val data = msg.msg.asInstanceOf[In]
+ override def onNext(msg: Message) : Unit = {
+ val data = msg.value.asInstanceOf[In]
val time = msg.timestamp
f match {
case Some(func) =>
val fout = func(data)
fout.onComplete(value => {
value.foreach(out => {
- val msg = new Message(out, time)
+ val msg = Message(out, time)
context.output(msg)
})
})
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
index ad18f72..1ecc4d0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
@@ -28,7 +28,7 @@ class MergeTask(context: TaskContext, userConf : UserConfig)
val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE)
val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS)
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
context.output(msg)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
index 5bea47e..bff4e76 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -27,7 +27,7 @@ class SingleSourceTask[T](context: TaskContext, userConf : UserConfig)
val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
context.output(Message(elem, msg.timestamp))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
index 1b9c4e3..d92e24c 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
@@ -61,7 +61,7 @@ class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
override def onStart(startTime : Instant) : Unit = {}
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
queue.add(msg)
trySendingData()
}
@@ -71,7 +71,7 @@ class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
private def trySendingData(): Unit = {
if (subscriber != null) {
(0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg =>
- subscriber ! msg.msg
+ subscriber ! msg.value
request -= 1
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
index 5b64a52..b7fd9c3 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -53,7 +53,7 @@ class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig)
override def onStart(startTime : Instant) : Unit = {}
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
LOG.info("AkkaStreamSource receiving message " + msg)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
index b776f2c..d8beeb5 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -34,8 +34,8 @@ class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig
f = func()
}
- override def onNext(msg : Message) : Unit = {
- val in: IN = msg.msg.asInstanceOf[IN]
+ override def onNext(msg: Message) : Unit = {
+ val in: IN = msg.value.asInstanceOf[IN]
val out: Iterable[OUT] = f(in)
val iterator = out.iterator
while(iterator.hasNext) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
index 7aa4e8e..689a6b3 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -42,8 +42,8 @@ class TakeWithinTask[T](context: TaskContext, userConf : UserConfig)
)
}
- override def onNext(msg : Message) : Unit = {
- msg.msg match {
+ override def onNext(msg: Message) : Unit = {
+ msg.value match {
case DropWithinTimeout =>
timeoutActive = true
case _ =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
index 3c7ad87..ef1e35f 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
@@ -24,7 +24,6 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
@@ -38,8 +37,8 @@ class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
val interval = timePeriod.toNanos / cost
// TODO control rate from TaskActor
- override def onNext(msg : Message) : Unit = {
- val data = msg.msg.asInstanceOf[T]
+ override def onNext(msg: Message) : Unit = {
+ val data = msg.value.asInstanceOf[T]
val time = msg.timestamp
context.output(msg)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
index a10e138..086fd48 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -19,7 +19,6 @@
package org.apache.gearpump.akkastream.task
import java.time.Instant
-import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
@@ -33,7 +32,6 @@ class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
val initialDelay = userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY).
getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
- (TickSourceTask.INITIAL_DELAY)
val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL).
getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
val tick = userConf.getValue[T](TickSourceTask.TICK).get
@@ -44,7 +42,7 @@ class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
)
}
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message) : Unit = {
context.output(msg)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
index 005d018..7f39ed7 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -29,10 +29,10 @@ class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
val unzip = userConf.
getValue[UnZipFunction[In, A1, A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
- override def onNext(msg : Message) : Unit = {
- val message = msg.msg
+ override def onNext(msg: Message) : Unit = {
+ val value = msg.value
val time = msg.timestamp
- val pair = unzip(message.asInstanceOf[In])
+ val pair = unzip(value.asInstanceOf[In])
val (a, b) = pair
output(0, Message(a.asInstanceOf[AnyRef], time))
output(1, Message(b.asInstanceOf[AnyRef], time))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
index 7e0c082..ab0116c 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
@@ -31,8 +31,8 @@ class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig)
var a1: Option[A1] = None
var a2: Option[A2] = None
- override def onNext(msg : Message) : Unit = {
- val message = msg.msg
+ override def onNext(msg: Message) : Unit = {
+ val message = msg.value
val time = msg.timestamp
a1 match {
case Some(x) =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
----------------------------------------------------------------------
diff --git a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
index 492fffe..41c6499 100644
--- a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
+++ b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala
@@ -49,7 +49,7 @@ class RMQSink(userConfig: UserConfig,
}
override def write(message: Message): Unit = {
- publish(message.msg)
+ publish(message.value)
}
override def close(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
index 36a9fe3..9afb1fe 100644
--- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -59,7 +59,7 @@ class RedisSink(
}
override def write(message: Message): Unit = {
- message.msg match {
+ message.value match {
// GEO
case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
index 4969314..86fc0ec 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -42,7 +42,7 @@ private[storm] class StormPartitioner(target: String) extends MulticastPartition
override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int)
: Array[Int] = {
- val stormTuple = msg.msg.asInstanceOf[GearpumpTuple]
+ val stormTuple = msg.value.asInstanceOf[GearpumpTuple]
stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
index e3b45fb..2f1b77b 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.experiments.storm.processor
import java.time.Instant
import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.Duration
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index 4536277..248ca44 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._
import org.apache.gearpump.experiments.storm.util.StormUtil._
import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
import org.apache.gearpump.streaming.DAG
-import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext}
+import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId}
import org.apache.gearpump.util.{Constants, LogUtil}
import org.apache.gearpump.{Message, TimeStamp}
import org.slf4j.Logger
@@ -219,7 +219,7 @@ object GearpumpStormComponent {
override def next(message: Message): Unit = {
val timestamp = message.timestamp.toEpochMilli
collector.setTimestamp(timestamp)
- bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
+ bolt.execute(message.value.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
timestamp))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
index 5fc631b..aabb7c1 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
@@ -52,7 +52,8 @@ class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
targetPartitions.foreach {
case (target, ps) => {
val partitioner = new StormPartitioner(target)
- ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, currentPartitionId)
+ ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1,
+ currentPartitionId)
}
}
val partitionNum = id
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index 6b894da..05627c9 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -18,8 +18,8 @@
package org.apache.gearpump.experiments.storm.util
import java.util.{List => JList, Map => JMap}
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters._
import backtype.storm.generated.Grouping
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -27,8 +27,7 @@ import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump.{Message, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
import org.apache.gearpump.streaming.MockUtil
@@ -63,9 +62,9 @@ class StormOutputCollectorSpec
stormOutputCollector.setTimestamp(timestamp)
stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
- case Message(tuple: GearpumpTuple, t) =>
+ message: Message =>
val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
- tuple == expected && t == timestamp
+ message.value == expected && message.timestamp.toEpochMilli == timestamp
}))
}
}
@@ -96,11 +95,11 @@ class StormOutputCollectorSpec
stormOutputCollector.emitDirect(id, streamId, values)
val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
- case Message(tuple: GearpumpTuple, t) => {
+ message: Message => {
val expected = new GearpumpTuple(values, stormTaskId, streamId,
Map(target -> partitions))
- val result = tuple == expected && t == timestamp
+ val result = message.value == expected && message.timestamp.toEpochMilli == timestamp
result
}
}))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
index bb56003..7d0838e 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala
@@ -22,7 +22,6 @@ import java.text.SimpleDateFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.HdfsConfiguration
import org.apache.hadoop.io.SequenceFile
-
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
index 822eb5f..04e4781 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
@@ -18,13 +18,12 @@
package org.apache.gearpump.streaming.hadoop.lib.format
import org.apache.hadoop.io.{LongWritable, Text, Writable}
-
import org.apache.gearpump.Message
class DefaultSequenceFormatter extends OutputFormatter {
override def getKey(message: Message): Writable = new LongWritable(message.timestamp.toEpochMilli)
- override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String])
+ override def getValue(message: Message): Writable = new Text(message.value.asInstanceOf[String])
override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
index 435d0fc..2e5874c 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
@@ -18,7 +18,6 @@
package org.apache.gearpump.streaming.hadoop.lib.format
import org.apache.hadoop.io.Writable
-
import org.apache.gearpump.Message
trait OutputFormatter extends Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
index 4b41ba1..f5e6483 100644
--- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
+++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -84,7 +84,7 @@ class HBaseSink(userConfig: UserConfig, tableName: String,
}
override def write(message: Message): Unit = {
- put(message.msg)
+ put(message.value)
}
def close(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
index e5534a6..76b4c0b 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
@@ -68,7 +68,7 @@ abstract class AbstractKafkaSink private[kafka](
}
override def write(message: Message): Unit = {
- message.msg match {
+ message.value match {
case (k: Array[Byte], v: Array[Byte]) =>
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, k, v)
producer.send(record)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index a8bda50..d5a8729 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -28,7 +28,7 @@ import org.apache.gearpump.streaming.kafka.lib.KafkaMessageDecoder
import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
import KafkaClient.KafkaClientFactory
-import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, FetchThread}
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.{FetchThread, KafkaMessage}
import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper
import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
import org.apache.gearpump.streaming.kafka.util.KafkaConfig
@@ -97,6 +97,7 @@ abstract class AbstractKafkaSource(
/**
* Reads a record from incoming queue, decodes, filters and checkpoints offsets
* before returns a Message. Message can be null if the incoming queue is empty.
+ *
* @return a [[org.apache.gearpump.Message]] or null
*/
override def read(): Message = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
index 9f29022..2b52d76 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
@@ -18,8 +18,6 @@
package org.apache.gearpump.streaming.kafka.lib.source
-import java.time.Instant
-
import com.twitter.bijection.Injection
import org.scalacheck.Gen
import org.scalatest.prop.PropertyChecks
@@ -34,7 +32,7 @@ class DefaultKafkaMessageDecoderSpec extends PropSpec with PropertyChecks with M
val msgAndWmk = decoder.fromBytes(kbytes, vbytes)
val message = msgAndWmk.message
val watermark = msgAndWmk.watermark
- message.msg shouldBe vbytes
+ message.value shouldBe vbytes
// processing time as message timestamp and watermark
message.timestamp shouldBe watermark
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
index 7e1214e..3789d4e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner {
override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- val hashCode = fn(message.msg.asInstanceOf[T]).hashCode()
+ val hashCode = fn(message.value.asInstanceOf[T]).hashCode()
(hashCode & Integer.MAX_VALUE) % partitionNum
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 9a614e8..9c5e347 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -291,7 +291,7 @@ class LoggerSink[T] extends DataSink {
}
override def write(message: Message): Unit = {
- logger.info("logging message " + message.msg)
+ logger.info("logging message " + message.value)
}
override def close(): Unit = Unit
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 86ac933..9571697 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -42,18 +42,19 @@ object TransformTask {
val processor = operator.map(FunctionRunner.withEmitFn(_,
(out: OUT) => taskContext.output(Message(out, watermarkTime))))
processor.foreach(_.setup())
- buffer.foreach { case message@Message(in, time) =>
- if (time < watermarkTime) {
- processor match {
- case Some(p) =>
- // .toList forces eager evaluation
- p.process(in.asInstanceOf[IN]).toList
- case None =>
- taskContext.output(Message(in, watermarkTime))
+ buffer.foreach {
+ message: Message =>
+ if (message.timestamp.toEpochMilli < watermarkTime) {
+ processor match {
+ case Some(p) =>
+ // .toList forces eager evaluation
+ p.process(message.value.asInstanceOf[IN]).toList
+ case None =>
+ taskContext.output(Message(message.value, watermarkTime))
+ }
+ } else {
+ nextBuffer +:= message
}
- } else {
- nextBuffer +:= message
- }
}
// .toList forces eager evaluation
processor.map(_.finish().toList)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 5f9d19b..05ce74e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -68,7 +68,7 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo
case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) {
def groupBy(message: Message): (GROUP, List[Window]) = {
- val ele = message.msg.asInstanceOf[T]
+ val ele = message.value.asInstanceOf[T]
val group = groupByFn(ele)
val windows = window.windowFn(new WindowFunction.Context[T] {
override def element: T = ele
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 42d50e2..74749b9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -58,7 +58,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
override def process(message: Message): Unit = {
- val input = message.msg.asInstanceOf[IN]
+ val input = message.value.asInstanceOf[IN]
val (group, windows) = groupBy.groupBy(message)
if (!groupedWindowInputs.containsKey(group)) {
groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
index 9b63e04..36c331a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
@@ -25,8 +25,8 @@ class BroadcastPartitioner extends MulticastPartitioner {
private var lastPartitionNum = -1
private var partitions = Array.empty[Int]
- override def getPartitions(
- msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+ override def getPartitions(msg: Message, partitionNum: Int,
+ currentPartitionId: Int): Array[Int] = {
if (partitionNum != lastPartitionNum) {
partitions = (0 until partitionNum).toArray
lastPartitionNum = partitionNum
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
index 6137705..dc48741 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
@@ -27,6 +27,6 @@ import org.apache.gearpump.Message
*/
class HashPartitioner extends UnicastPartitioner {
override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
+ (msg.value.hashCode() & Integer.MAX_VALUE) % partitionNum
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 4371257..0ec2b6f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -19,8 +19,7 @@ package org.apache.gearpump.streaming.source
import java.time.Instant
-import org.apache.gearpump.Message
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS}
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message}
/**
* message used by source task to report source watermark.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index f9f5b33..d3ffaa9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -22,7 +22,7 @@ import java.time.Instant
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
import org.apache.gearpump.util.LogUtil
import org.apache.gearpump.{Message, TimeStamp}
@@ -79,7 +79,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
}
final override def onNext(message: Message): Unit = {
- checkpointManager.update(message.timeInMillis)
+ checkpointManager.update(message.timestamp.toEpochMilli)
.foreach(state.setNextCheckpointTime)
processMessage(state, message)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
index 3cc1cd0..6947dd4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
@@ -19,7 +19,6 @@
package org.apache.gearpump.streaming.task
import akka.actor.{ActorRef, ExtendedActorSystem}
-
import org.apache.gearpump.Message
import org.apache.gearpump.transport.netty.TaskMessage
import org.apache.gearpump.transport.{Express, HostPort}
@@ -55,8 +54,8 @@ trait ExpressTransport {
if (null == serializedMessage) {
msg match {
case message: Message =>
- val bytes = serializerPool.get().serialize(message.msg)
- serializedMessage = SerializedMessage(message.timeInMillis, bytes)
+ val bytes = serializerPool.get().serialize(message.value)
+ serializedMessage = SerializedMessage(message.timestamp.toEpochMilli, bytes)
case _ => serializedMessage = msg
}
}