You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:34 UTC
[24/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
index bee81fb..9cb7ca0 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +19,13 @@
package io.gearpump.streaming.source
import io.gearpump.streaming.task.TaskContext
-import io.gearpump.{TimeStamp, Message}
+import io.gearpump.{Message, TimeStamp}
/**
- * interface to implement custom source where data is read into the system.
+ * Interface to implement custom source where data is read into the system.
* a DataSource could be a message queue like kafka or simply data generation source.
*
- * an example would be like
+ * An example would be like
* {{{
* GenStringSource extends DataSource {
*
@@ -44,7 +44,7 @@ import io.gearpump.{TimeStamp, Message}
trait DataSource extends java.io.Serializable {
/**
- * open connection to data source
+ * Opens connection to data source
* invoked in onStart() method of [[io.gearpump.streaming.source.DataSourceTask]]
* @param context is the task context at runtime
* @param startTime is the start time of system
@@ -52,7 +52,7 @@ trait DataSource extends java.io.Serializable {
def open(context: TaskContext, startTime: Option[TimeStamp]): Unit
/**
- * read a number of messages from data source.
+ * Reads a number of messages from data source.
* invoked in each onNext() method of [[io.gearpump.streaming.source.DataSourceTask]]
* @param batchSize max number of messages to read
* @return a list of messages wrapped in [[io.gearpump.Message]]
@@ -60,7 +60,7 @@ trait DataSource extends java.io.Serializable {
def read(batchSize: Int): List[Message]
/**
- * close connection to data source.
+ * Closes connection to data source.
* invoked in onStop() method of [[io.gearpump.streaming.source.DataSourceTask]]
*/
def close(): Unit
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
index e7c0599..6ca939f 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,5 +22,4 @@ object DataSourceConfig {
val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size"
val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class"
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
index 61eabcc..384b86a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,14 +19,15 @@
package io.gearpump.streaming.source
import akka.actor.ActorSystem
-import io.gearpump.streaming.Processor
+
import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.Processor
/**
- * utility that helps user to create a DAG starting with [[DataSourceTask]]
+ * Utility that helps user to create a DAG starting with [[DataSourceTask]]
* user should pass in a [[DataSource]]
*
- * here is an example to build a DAG that reads from Kafka source followed by word count
+ * Here is an example to build a DAG that reads from Kafka source followed by word count
* {{{
* val source = new KafkaSource()
* val sourceProcessor = DataSourceProcessor(source, 1)
@@ -36,10 +37,12 @@ import io.gearpump.cluster.UserConfig
* }}}
*/
object DataSourceProcessor {
- def apply(dataSource: DataSource,
- parallelism: Int,
- description: String = "",
- taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem): Processor[DataSourceTask] = {
+ def apply(
+ dataSource: DataSource,
+ parallelism: Int,
+ description: String = "",
+ taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
+ : Processor[DataSourceTask] = {
Processor[DataSourceTask](parallelism, description = description,
taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
index 4c65100..d9b2110 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,26 +18,27 @@
package io.gearpump.streaming.source
-import io.gearpump.streaming.task.{Task, StartTime, TaskContext}
import io.gearpump._
import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
object DataSourceTask {
val DATA_SOURCE = "data_source"
}
/**
- * general task that runs any [[DataSource]]
- * see [[DataSourceProcessor]] for its usage
+ * Task container for [[io.gearpump.streaming.source.DataSource]].
+ * See [[io.gearpump.streaming.source.DataSourceProcessor]] for its usage
*
- * DataSourceTask calls
- * - `DataSource.open` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]] and application start time
- * - `DataSource.read` in each `onNext`, which reads a batch of messages whose size are defined by
- * `gearpump.source.read.batch.size`.
- * - `DataSource.close` in `onStop`
+ * DataSourceTask calls:
+ * - `DataSource.open()` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]]
+ * and application start time
+ * - `DataSource.read()` in each `onNext`, which reads a batch of messages whose size are
+ * defined by `gearpump.source.read.batch.size`.
+ * - `DataSource.close()` in `onStop`
*/
class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) {
- import DataSourceTask._
+ import io.gearpump.streaming.source.DataSourceTask._
private val source = conf.getValue[DataSource](DATA_SOURCE).get
private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
index a9ee674..b7d4e90 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,10 @@
package io.gearpump.streaming.source
import io.gearpump.streaming.transaction.api.TimeStampFilter
-import io.gearpump.{TimeStamp, Message}
+import io.gearpump.{Message, TimeStamp}
/**
- * default TimeStampFilter that filters out messages with smaller timestamps
+ * TimeStampFilter filters out messages which have obsolete (smaller) timestamp.
*/
class DefaultTimeStampFilter extends TimeStampFilter {
override def filter(msg: Message, predicate: TimeStamp): Option[Message] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
index a25e20e..dfe3e93 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,10 @@
package io.gearpump.streaming.state.api
trait Monoid[T] extends java.io.Serializable {
- def plus(l: T, r: T): T
- def zero: T
+ def plus(l: T, r: T): T
+ def zero: T
}
trait Group[T] extends Monoid[T] {
- def minus(l: T, r: T): T
- }
\ No newline at end of file
+ def minus(l: T, r: T): T
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
index dadbba6..238eab4 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -28,9 +28,9 @@ import io.gearpump.TimeStamp
* the incoming value using monoid.plus to get a new state value
*/
abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
- // left state updated by messages before checkpoint time
+ // Left state updated by messages before checkpoint time
private[state] var left: T = monoid.zero
- // right state updated by message after checkpoint time
+ // Right state updated by message after checkpoint time
private[state] var right: T = monoid.zero
protected var checkpointTime = Long.MaxValue
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
index 6c595da..f1b923a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,33 +30,32 @@ import io.gearpump._
trait PersistentState[T] {
/**
- * recover state to a previous checkpoint
+ * Recovers state to a previous checkpoint
* usually invoked by the framework
*/
def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
/**
- * update state on a new message
+ * Updates state on a new message
* this is invoked by user
*/
def update(timestamp: TimeStamp, t: T): Unit
/**
- * set next checkpoint time
+ * Sets next checkpoint time
* should be invoked by the framework
*/
def setNextCheckpointTime(timeStamp: TimeStamp): Unit
/**
- * get a binary snapshot of state
+ * Gets a binary snapshot of state
* usually invoked by the framework
*/
def checkpoint(): Array[Byte]
/**
- * unwrap the raw value of state
+ * Unwraps the raw value of state
*/
def get: Option[T]
}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
index fbf507f..a8a5d5d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,15 @@
package io.gearpump.streaming.state.api
import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.state.impl.{PersistentStateConfig, CheckpointManager}
+import io.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, TaskContext}
import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
import io.gearpump.util.LogUtil
import io.gearpump.{Message, TimeStamp}
-import scala.concurrent.duration.FiniteDuration
-
object PersistentTask {
val CHECKPOINT = Message("checkpoint")
val LOG = LogUtil.getLogger(getClass)
@@ -42,35 +41,32 @@ object PersistentTask {
*/
abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
- import io.gearpump.streaming.state.api.PersistentTask._
import taskContext._
- val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+ import io.gearpump.streaming.state.api.PersistentTask._
+
+ val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+ PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, taskContext)
val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore)
- // system time interval to attempt checkpoint
+ // System time interval to attempt checkpoint
private val checkpointAttemptInterval = 1000L
/**
- * subclass should override this method to pass in
- * a PersistentState
- *
- * the framework has already offered two states
- *
- * - NonWindowState
- * state with no time or other boundary
- * - WindowState
- * each state is bounded by a time window
+ * Subclass should override this method to pass in a PersistentState. the framework has already
+ * offered two states:
+ * - NonWindowState: state with no time or other boundary
+ * - WindowState: each state is bounded by a time window
*/
def persistentState: PersistentState[T]
/**
- * subclass should override this method to specify how a
- * new message should update state
+ * Subclass should override this method to specify how a new message should update state
*/
def processMessage(state: PersistentState[T], message: Message): Unit
+ /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */
val state = persistentState
final override def onStart(startTime: StartTime): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
index 4b8e2f5..f87b224 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,6 @@ package io.gearpump.streaming.state.api
import scala.util.Try
trait Serializer[T] extends java.io.Serializable {
- def serialize(t: T): Array[Byte]
- def deserialize(bytes: Array[Byte]): Try[T]
- }
+ def serialize(t: T): Array[Byte]
+ def deserialize(bytes: Array[Byte]): Try[T]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
index 5fcbbcb..76c91c7 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ package io.gearpump.streaming.state.impl
import io.gearpump.TimeStamp
import io.gearpump.streaming.transaction.api.CheckpointStore
+/** Manage physical checkpoints to persitent storage like HDFS */
class CheckpointManager(checkpointInterval: Long,
checkpointStore: CheckpointStore) {
@@ -34,7 +35,7 @@ class CheckpointManager(checkpointInterval: Long,
def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = {
checkpointStore.persist(timestamp, checkpoint)
checkpointTime = checkpointTime.collect { case time if maxMessageTime > time =>
- time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
+ time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
}
checkpointTime
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
index 164b932..21623e3 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,7 @@ package io.gearpump.streaming.state.impl
import io.gearpump.TimeStamp
import io.gearpump.cluster.UserConfig
import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.transaction.api.{CheckpointStoreFactory, CheckpointStore}
+import io.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
/**
* an in memory store provided for test
@@ -41,7 +41,6 @@ class InMemoryCheckpointStore extends CheckpointStore {
override def close(): Unit = {
checkpoints = Map.empty[TimeStamp, Array[Byte]]
}
-
}
class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
index aefd7e1..dcd3918 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,22 @@
package io.gearpump.streaming.state.impl
+import org.slf4j.Logger
+
import io.gearpump.TimeStamp
import io.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer}
-import io.gearpump.util.LogUtil
import io.gearpump.streaming.state.impl.NonWindowState._
-import org.slf4j.Logger
+import io.gearpump.util.LogUtil
object NonWindowState {
val LOG: Logger = LogUtil.getLogger(classOf[NonWindowState[_]])
}
/**
- * a MonoidState storing non-window state
+ * a MonoidState storing non-window state
*/
-class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) extends MonoidState[T](monoid) {
+class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T])
+ extends MonoidState[T](monoid) {
override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
serializer.deserialize(bytes).foreach(left = _)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
index ecbb092..d7488d7 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
index 13f0eef..63cdf06 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,7 +20,7 @@ package io.gearpump.streaming.state.impl
import io.gearpump.TimeStamp
/**
- * used in window applications
+ * Used in window applications
* it keeps the current window and slide ahead when the window expires
*/
class Window(val windowSize: Long, val windowStep: Long) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
index 41395f6..d7d3776 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
index 007c9a4..30382c0 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
@@ -2,12 +2,12 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding cstateyright ownership. The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a cstatey of the License at
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
package io.gearpump.streaming.state.impl
+import scala.collection.immutable.TreeMap
+
+import org.slf4j.Logger
+
import io.gearpump.TimeStamp
-import io.gearpump.streaming.state.api.{Group, Serializer, MonoidState}
-import io.gearpump.streaming.task.TaskContext
+import io.gearpump.streaming.state.api.{Group, MonoidState, Serializer}
import io.gearpump.streaming.state.impl.WindowState._
+import io.gearpump.streaming.task.TaskContext
import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.immutable.TreeMap
/**
* an interval is a dynamic time range that is divided by window boundary and checkpoint time
@@ -51,10 +52,9 @@ object WindowState {
* possible to undo the update by messages that have left the window
*/
class WindowState[T](group: Group[T],
- serializer: Serializer[TreeMap[Interval, T]],
- taskContext: TaskContext,
- window: Window)
- extends MonoidState[T](group) {
+ serializer: Serializer[TreeMap[Interval, T]],
+ taskContext: TaskContext,
+ window: Window) extends MonoidState[T](group) {
/**
* each interval has a state updated by message with timestamp in
* [interval.startTime, interval.endTime)
@@ -67,11 +67,11 @@ class WindowState[T](group: Group[T],
window.slideTo(timestamp)
serializer.deserialize(bytes)
.foreach { states =>
- intervalStates = states
- left = states.foldLeft(left) { case (accum, iter) =>
- group.plus(accum, iter._2)
+ intervalStates = states
+ left = states.foldLeft(left) { case (accum, iter) =>
+ group.plus(accum, iter._2)
+ }
}
- }
}
override def update(timestamp: TimeStamp, t: T): Unit = {
@@ -115,14 +115,17 @@ class WindowState[T](group: Group[T],
}
/**
- * each message will update state in corresponding Interval[StartTime, endTime),
+ * Each message will update state in corresponding Interval[StartTime, endTime),
+ *
* which is decided by the message's timestamp t where
- * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime)
- * endTime = Math.min(upperBound1, upperBound2, checkpointTime)
- * lowerBound1 = step * Nmax1 <= t
- * lowerBound2 = step * Nmax2 + size <= t
- * upperBound1 = step * Nmin1 > t
- * upperBound2 = step * Nmin2 + size > t
+ * {{{
+ * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime)
+ * endTime = Math.min(upperBound1, upperBound2, checkpointTime)
+ * lowerBound1 = step * Nmax1 <= t
+ * lowerBound2 = step * Nmax2 + size <= t
+ * upperBound1 = step * Nmin1 > t
+ * upperBound2 = step * Nmin2 + size > t
+ * }}}
*/
private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = {
val windowSize = window.windowSize
@@ -144,7 +147,8 @@ class WindowState[T](group: Group[T],
}
}
- private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp): Unit = {
+ private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp)
+ : Unit = {
val interval = getInterval(timestamp, checkpointTime)
intervalStates.get(interval) match {
case Some(st) =>
@@ -154,8 +158,8 @@ class WindowState[T](group: Group[T],
}
}
- private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp): TreeMap[Interval, T] = {
+ private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp)
+ : TreeMap[Interval, T] = {
intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime)
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
index dcdc2ff..962f48f 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,12 +21,10 @@ import scala.concurrent._
/**
* Generic storage to store KV Data.
- *
*/
trait AppDataStore {
def put(key: String, value: Any): Future[Any]
- def get(key: String) : Future[Any]
+ def get(key: String): Future[Any]
}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
index 13e3dae..f1a19a8 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,12 +17,13 @@
*/
package io.gearpump.streaming.storage
+import scala.concurrent.Future
+
import akka.actor.ActorRef
import akka.pattern.ask
-import io.gearpump.cluster.AppMasterToMaster.{GetAppDataResult, GetAppData, SaveAppData}
-import io.gearpump.util.Constants
-import scala.concurrent.Future
+import io.gearpump.cluster.AppMasterToMaster.{GetAppData, GetAppDataResult, SaveAppData}
+import io.gearpump.util.Constants
/**
* In memory application storage located on master nodes
@@ -36,8 +37,8 @@ class InMemoryAppStoreOnMaster(appId: Int, master: ActorRef) extends AppDataStor
}
override def get(key: String): Future[Any] = {
- master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map{result =>
- if(result.key.equals(key)) {
+ master.ask(GetAppData(appId, key)).asInstanceOf[Future[GetAppDataResult]].map { result =>
+ if (result.key.equals(key)) {
result.value
} else {
null
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
index 2dbdd14..a3cb6e1 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,16 @@
package io.gearpump.streaming.task
import akka.actor.{ActorRef, ExtendedActorSystem}
-import io.gearpump.serializer.SerializationFramework
+
+import io.gearpump.Message
import io.gearpump.transport.netty.TaskMessage
import io.gearpump.transport.{Express, HostPort}
-import io.gearpump.Message
-
-import scala.collection.mutable
import io.gearpump.util.AkkaHelper
/**
* ExpressTransport wire the networking function from default akka
- * networking to customized implementation [[Express]].
- *
- * See [[Express]] for more information.
+ * networking to customized implementation [[io.gearpump.transport.Express]].
*
+ * See [[io.gearpump.transport.Express]] for more information.
*/
trait ExpressTransport {
this: TaskActor =>
@@ -39,15 +36,15 @@ trait ExpressTransport {
final val express = Express(context.system)
implicit val system = context.system.asInstanceOf[ExtendedActorSystem]
- final def local = express.localHost
+ final def local: HostPort = express.localHost
lazy val sourceId = TaskId.toLong(taskId)
lazy val sessionRef: ActorRef = {
AkkaHelper.actorFor(system, s"/session#$sessionId")
}
- def transport(msg : AnyRef, remotes : TaskId *): Unit = {
- var serializedMessage : AnyRef = null
+ def transport(msg: AnyRef, remotes: TaskId*): Unit = {
+ var serializedMessage: AnyRef = null
remotes.foreach { remote =>
val transportId = TaskId.toLong(remote)
@@ -69,7 +66,8 @@ trait ExpressTransport {
if (remoteAddress.isDefined) {
express.transport(taskMessage, remoteAddress.get)
} else {
- LOG.error(s"Can not find target task $remote, maybe the application is undergoing recovery")
+ LOG.error(
+ s"Can not find target task $remote, maybe the application is undergoing recovery")
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
index ff2ca89..9f9bf1b 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
index edac7f0..72bc7db 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
@@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.streaming.task
-import io.gearpump.esotericsoftware.kryo.util.ObjectMap
-import io.gearpump.esotericsoftware.kryo.util.IntMap
+import io.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap}
import io.gearpump.streaming.task.SerializerResolver.Registration
private[task] class SerializerResolver {
private var classId = 0
- val idToRegistration = new IntMap[Registration]()
- val classToRegistration = new ObjectMap[Class[_], Registration]()
+ private val idToRegistration = new IntMap[Registration]()
+ private val classToRegistration = new ObjectMap[Class[_], Registration]()
- def register(clazz: Class[_], serializer: TaskMessageSerializer[_]): Unit = {
- val registration = Registration(classId, clazz, serializer)
+ def register[T](clazz: Class[T], serializer: TaskMessageSerializer[T]): Unit = {
+ val registration = new Registration(classId, clazz, serializer)
idToRegistration.put(classId, registration)
classToRegistration.put(clazz, registration)
classId += 1
@@ -42,6 +42,6 @@ private[task] class SerializerResolver {
}
}
-object SerializerResolver{
- case class Registration(id: Int, clazz: Class[_], serializer: TaskMessageSerializer[_])
+object SerializerResolver {
+ class Registration(val id: Int, val clazz: Class[_], val serializer: TaskMessageSerializer[_])
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
index 75e3521..6bc8b15 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
@@ -1,5 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.gearpump.streaming.task
import io.gearpump.TimeStamp
-case class StartTime(startTime : TimeStamp = 0)
+/** Start time of streaming application. All message older than start time will be dropped */
+case class StartTime(startTime: TimeStamp = 0)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
index 52b8021..17d0b1b 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,12 +19,13 @@ package io.gearpump.streaming.task
import java.io.{DataInput, DataOutput}
-import io.gearpump.streaming.{LatencyProbeSerializer, InitialAckRequestSerializer, AckRequestSerializer, AckSerializer}
+import org.slf4j.Logger
+
+import io.gearpump.streaming.{AckRequestSerializer, AckSerializer, InitialAckRequestSerializer, LatencyProbeSerializer}
import io.gearpump.transport.netty.ITransportMessageSerializer
import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-class StreamingTransportSerializer extends ITransportMessageSerializer{
+class StreamingTransportSerializer extends ITransportMessageSerializer {
private val log: Logger = LogUtil.getLogger(getClass)
private val serializers = new SerializerResolver
@@ -36,7 +37,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{
override def serialize(dataOutput: DataOutput, obj: Object): Unit = {
val registration = serializers.getRegistration(obj.getClass)
- if(registration != null) {
+ if (registration != null) {
dataOutput.writeInt(registration.id)
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].write(dataOutput, obj)
} else {
@@ -47,7 +48,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{
override def deserialize(dataInput: DataInput, length: Int): Object = {
val classID = dataInput.readInt()
val registration = serializers.getRegistration(classID)
- if(registration != null) {
+ if (registration != null) {
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].read(dataInput)
} else {
log.error(s"Can not find serializer for class id $classID")
@@ -57,7 +58,7 @@ class StreamingTransportSerializer extends ITransportMessageSerializer{
override def getLength(obj: Object): Int = {
val registration = serializers.getRegistration(obj.getClass)
- if(registration != null) {
+ if (registration != null) {
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].getLength(obj) + 4
} else {
log.error(s"Can not find serializer for class type ${obj.getClass}")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
index 72d46f7..d20074b 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,7 +19,7 @@
package io.gearpump.streaming.task
import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.{LifeTime, DAG}
+import io.gearpump.streaming.{DAG, LifeTime}
/**
* Each processor can have multiple downstream subscribers.
@@ -30,8 +30,8 @@ import io.gearpump.streaming.{LifeTime, DAG}
* @param processorId subscriber processor Id
* @param partitionerDescription subscriber partitioner
*/
-
-case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription, parallelism: Int, lifeTime: LifeTime)
+case class Subscriber(processorId: Int, partitionerDescription: PartitionerDescription,
+ parallelism: Int, lifeTime: LifeTime)
object Subscriber {
@@ -50,7 +50,8 @@ object Subscriber {
edges.foldLeft(List.empty[Subscriber]) { (list, nodeEdgeNode) =>
val (_, partitioner, downstreamProcessorId) = nodeEdgeNode
val downstreamProcessor = dag.processors(downstreamProcessorId)
- list :+ Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism, downstreamProcessor.life)
+ list :+ Subscriber(downstreamProcessorId, partitioner,
+ downstreamProcessor.parallelism, downstreamProcessor.life)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
index 0b1fa29..155edf4 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,22 @@
package io.gearpump.streaming.task
+import org.slf4j.Logger
+
import io.gearpump.google.common.primitives.Shorts
-import io.gearpump.partitioner.{Partitioner, MulticastPartitioner, UnicastPartitioner}
+import io.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
import io.gearpump.streaming.AppMasterToExecutor.MsgLostException
import io.gearpump.streaming.LifeTime
import io.gearpump.streaming.task.Subscription._
import io.gearpump.util.LogUtil
import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
/**
- * This manage the output and message clock for single downstream processor
+ * Manges the output and message clock for single downstream processor
*
* @param subscriber downstream processor
- * @param maxPendingMessageCount trigger flow control. Should be bigger than maxPendingMessageCountPerAckRequest
+ * @param maxPendingMessageCount trigger flow control. Should be bigger than
+ * maxPendingMessageCountPerAckRequest
* @param ackOnceEveryMessageCount send on AckRequest to the target
*/
class Subscription(
@@ -44,9 +46,10 @@ class Subscription(
ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) {
assert(maxPendingMessageCount >= ackOnceEveryMessageCount)
- assert(maxPendingMessageCount < Short.MaxValue / 2)
+ assert(maxPendingMessageCount < Short.MaxValue / 2)
- val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId,
+ executor = executorId, task = taskId)
import subscriber.{parallelism, partitionerDescription, processorId}
@@ -62,8 +65,8 @@ class Subscription(
private var life = subscriber.lifeTime
- val partitioner = partitionerDescription.partitionerFactory.partitioner
- val sendFn = partitioner match {
+ private val partitioner = partitionerDescription.partitionerFactory.partitioner
+ private val sendFn = partitioner match {
case up: UnicastPartitioner =>
(msg: Message) => {
val partition = up.getPartition(msg, parallelism, taskId.index)
@@ -74,14 +77,13 @@ class Subscription(
val partitions = mp.getPartitions(msg, parallelism, taskId.index)
partitions.map(partition => sendMessage(msg, partition)).sum
}
-
}
def changeLife(life: LifeTime): Unit = {
this.life = life
}
- def start: Unit = {
+ def start(): Unit = {
val ackRequest = InitialAckRequest(taskId, sessionId)
transport.transport(ackRequest, allTasks: _*)
}
@@ -91,14 +93,16 @@ class Subscription(
}
/**
- * Return how many message is actually sent by this subscription
+ * Returns how many message is actually sent by this subscription
+ *
* @param msg the message to send
* @param partition the target partition to send message to
* @return 1 if success
*/
def sendMessage(msg: Message, partition: Int): Int = {
- // only send message whose timestamp matches the lifeTime
+ var count = 0
+ // Only sends message whose timestamp matches the lifeTime
if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timestamp)) {
val targetTask = TaskId(processorId, partition)
@@ -117,24 +121,25 @@ class Subscription(
(messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) {
sendLatencyProbe(partition)
}
-
- return 1
+ count = 1
+ count
} else {
if (needFlush) {
- flush
+ flush()
}
-
- return 0
+ count = 0
+ count
}
}
private var lastFlushTime: Long = 0L
private val FLUSH_INTERVAL = 5 * 1000 // ms
private def needFlush: Boolean = {
- System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && Shorts.max(pendingMessageCount: _*) > 0
+ System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
+ Shorts.max(pendingMessageCount: _*) > 0
}
- private def flush: Unit = {
+ private def flush(): Unit = {
lastFlushTime = System.currentTimeMillis()
allTasks.foreach { targetTaskId =>
sendAckRequest(targetTaskId.index)
@@ -142,13 +147,14 @@ class Subscription(
}
private def allTasks: scala.collection.Seq[TaskId] = {
- (0 until parallelism).map {taskIndex =>
+ (0 until parallelism).map { taskIndex =>
TaskId(processorId, taskIndex)
}
}
- /** Handle acknowledge message.
- * Throw MessageLossException if required.
+ /**
+ * Handles acknowledge message. Throw MessageLossException if required.
+ *
* @param ack acknowledge message received
*/
def receiveAck(ack: Ack): Unit = {
@@ -159,7 +165,7 @@ class Subscription(
if (ack.actualReceivedNum == ack.seq) {
if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) {
if (ack.seq == messageCount(index)) {
- // all messages have been acked.
+ // All messages have been acked.
minClockValue(index) = Long.MaxValue
} else {
minClockValue(index) = candidateMinClock(index)
@@ -171,7 +177,8 @@ class Subscription(
pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort
updateMaxPendingCount()
} else {
- LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, sent: ${ack.seq}, try to replay...")
+ LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, " +
+ s"sent: ${ack.seq}, try to replay...")
throw new MsgLostException
}
}
@@ -181,13 +188,14 @@ class Subscription(
minClockValue.min
}
- def allowSendingMoreMessages() : Boolean = {
+ def allowSendingMoreMessages(): Boolean = {
maxPendingCount < maxPendingMessageCount
}
def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = {
minClockValue.indices.foreach { i =>
- if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0 && allowSendingMoreMessages) {
+ if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0
+ && allowSendingMoreMessages) {
sendAckRequest(i)
sendLatencyProbe(i)
}
@@ -195,7 +203,7 @@ class Subscription(
}
private def sendAckRequest(partition: Int): Unit = {
- // we increment more count for each AckRequest
+ // Increments more count for each AckRequest
// to throttle the number of unacked AckRequest
incrementMessageCount(partition, ackOnceEveryMessageCount)
val targetTask = TaskId(processorId, partition)
@@ -218,11 +226,10 @@ class Subscription(
val targetTask = TaskId(processorId, partition)
transport.transport(probeLatency, targetTask)
}
-
}
object Subscription {
- //make sure it is smaller than MAX_PENDING_MESSAGE_COUNT
+ // Makes sure it is smaller than MAX_PENDING_MESSAGE_COUNT
final val ONE_ACKREQUEST_EVERY_MESSAGE_COUNT = 100
final val MAX_PENDING_MESSAGE_COUNT = 1000
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
index a176ca7..212a659 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
package io.gearpump.streaming.task
+import scala.concurrent.duration.FiniteDuration
+
import akka.actor.Actor.Receive
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.util.LogUtil
import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
-
-import scala.concurrent.duration.FiniteDuration
/**
* This provides context information for a task.
@@ -36,7 +37,7 @@ trait TaskContext {
def executorId: Int
- def appId : Int
+ def appId: Int
def appName: String
@@ -44,7 +45,7 @@ trait TaskContext {
* The actorRef of AppMaster
* @return application master's actor reference
*/
- def appMaster : ActorRef
+ def appMaster: ActorRef
/**
* The task parallelism
@@ -61,64 +62,62 @@ trait TaskContext {
*/
def parallelism: Int
-
/**
* Please don't use this if possible.
* @return self actor ref
*/
- //TODO: We should remove the self from TaskContext
+ // TODO: We should remove the self from TaskContext
def self: ActorRef
/**
* Please don't use this if possible
* @return the actor system
*/
- //TODO: we should remove this in future
+ // TODO: we should remove this in future
def system: ActorSystem
/**
- * This can be used to output messages to downstream tasks.
- * The data shuffling rule can be decided by Partitioner.
+ * This can be used to output messages to downstream tasks. The data shuffling rule
+ * can be decided by Partitioner.
+ *
* @param msg message to output
*/
- def output(msg : Message) : Unit
-
+ def output(msg: Message): Unit
def actorOf(props: Props): ActorRef
def actorOf(props: Props, name: String): ActorRef
- def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit): Cancellable
+ def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable
/**
* akka.actor.ActorRefProvider.scheduleOnce
+ *
* @param initialDelay the initial delay
* @param f the function to execute after initial delay
* @return the executable
*/
- def scheduleOnce(initialDelay: FiniteDuration)(f: ⇒ Unit): Cancellable
+ def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable
- /**
+ /**
* For managed message(type of Message), the sender only serve as a unique Id,
* It's address is not something meaningful, you should not use this directly
*
* For unmanaged message, the sender represent the sender ActorRef
* @return sender
*/
- def sender: ActorRef
-
+ def sender: ActorRef
/**
- * retrieve upstream min clock from TaskActor
+ * Retrieves upstream min clock from TaskActor
+ *
* @return the min clock
*/
def upstreamMinClock: TimeStamp
-
/**
- * logger is environment dependant, it should be provided by
+ * Logger is environment dependant, it should be provided by
* containing environment.
- * @return
*/
def logger: Logger
}
@@ -130,28 +129,35 @@ trait TaskInterface {
/**
* Method called with the task is initialized.
- * @param startTime startTime that can be used to decide from when a source producer task should replay the data source, or from when a processor task should recover its checkpoint data in to in-memory state.
+ * @param startTime startTime that can be used to decide from when a source producer task should
+ * replay the data source, or from when a processor task should recover its
+ * checkpoint data in to in-memory state.
*/
- def onStart(startTime : StartTime) : Unit
+ def onStart(startTime: StartTime): Unit
- /** Method called for each message received.
- * @param msg message send by upstream tasks
+ /**
+ * Method called for each message received.
+ *
+ * @param msg Message send by upstream tasks
*/
- def onNext(msg : Message) : Unit
+ def onNext(msg: Message): Unit
- /** Method called when task is under clean up.
+ /**
+ * Method called when task is under clean up.
+ *
* This can be used to cleanup resource when the application finished.
*/
- def onStop() : Unit
+ def onStop(): Unit
/**
- * handler for unmanaged message
- * @return the handler
- */
+ * Handlers unmanaged messages
+ *
+ * @return the handler
+ */
def receiveUnManagedMessage: Receive = null
}
-abstract class Task(taskContext : TaskContext, userConf : UserConfig) extends TaskInterface{
+abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends TaskInterface {
import taskContext.{appId, executorId, taskId}
@@ -170,11 +176,11 @@ abstract class Task(taskContext : TaskContext, userConf : UserConfig) extends Ta
*/
protected def sender: ActorRef = taskContext.sender
- def onStart(startTime : StartTime) : Unit = {}
+ def onStart(startTime: StartTime): Unit = {}
- def onNext(msg : Message) : Unit = {}
+ def onNext(msg: Message): Unit = {}
- def onStop() : Unit = {}
+ def onStop(): Unit = {}
override def receiveUnManagedMessage: Receive = {
case msg =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
index d54fce5..b9b8829 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,6 +22,8 @@ import java.util
import java.util.concurrent.TimeUnit
import akka.actor._
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
import io.gearpump.metrics.Metrics
@@ -31,107 +33,107 @@ import io.gearpump.streaming.ExecutorToAppMaster._
import io.gearpump.streaming.{Constants, ProcessorId}
import io.gearpump.util.{LogUtil, TimeOutScheduler}
import io.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
/**
*
- * All tasks of Gearpump runs inside a Actor.
- * TaskActor is the Actor container for a task.
+ * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task.
*/
class TaskActor(
val taskId: TaskId,
- val taskContextData : TaskContextData,
- userConf : UserConfig,
+ val taskContextData: TaskContextData,
+ userConf: UserConfig,
val task: TaskWrapper,
inputSerializerPool: SerializationFramework)
- extends Actor with ExpressTransport with TimeOutScheduler{
+ extends Actor with ExpressTransport with TimeOutScheduler {
var upstreamMinClock: TimeStamp = 0L
private var _minClock: TimeStamp = 0L
def serializerPool: SerializationFramework = inputSerializerPool
- import Constants._
- import io.gearpump.streaming.task.TaskActor._
import taskContextData._
+
+ import io.gearpump.streaming.Constants._
+ import io.gearpump.streaming.task.TaskActor._
val config = context.system.settings.config
val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
- //metrics
+ // Metrics
private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
- private val receiveLatency = Metrics(context.system).histogram(s"$metricName:receiveLatency", sampleRate = 1)
+ private val receiveLatency = Metrics(context.system).histogram(
+ s"$metricName:receiveLatency", sampleRate = 1)
private val processTime = Metrics(context.system).histogram(s"$metricName:processTime")
private val sendThroughput = Metrics(context.system).meter(s"$metricName:sendThroughput")
private val receiveThroughput = Metrics(context.system).meter(s"$metricName:receiveThroughput")
private val maxPendingMessageCount = config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT)
- private val ackOnceEveryMessageCount = config.getInt(GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT)
+ private val ackOnceEveryMessageCount = config.getInt(
+ GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT)
private val executor = context.parent
private var life = taskContextData.life
- //latency probe
- import context.dispatcher
-
+ // Latency probe
import scala.concurrent.duration._
+
+ import context.dispatcher
final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
- // clock report interval
+ // Clock report interval
final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
- // flush interval
+ // Flush interval
final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS)
private val queue = new util.LinkedList[AnyRef]()
private var subscriptions = List.empty[(Int, Subscription)]
- // securityChecker will be responsible of dropping messages from
+ // SecurityChecker will be responsible of dropping messages from
// unknown sources
- private val securityChecker = new SecurityChecker(taskId, self)
+ private val securityChecker = new SecurityChecker(taskId, self)
private[task] var sessionId = NONE_SESSION
- //report to appMaster with my address
+ // Reports to appMaster with my address
express.registerLocalActor(TaskId.toLong(taskId), self)
- final def receive : Receive = null
+ final def receive: Receive = null
task.setTaskActor(this)
- def onStart(startTime : StartTime) : Unit = {
+ def onStart(startTime: StartTime): Unit = {
task.onStart(startTime)
}
- def onNext(msg : Message) : Unit = task.onNext(msg)
+ def onNext(msg: Message): Unit = task.onNext(msg)
def onUnManagedMessage(msg: Any): Unit = task.receiveUnManagedMessage.apply(msg)
- def onStop() : Unit = task.onStop()
+ def onStop(): Unit = task.onStop()
/**
* output to a downstream by specifying a arrayIndex
- * @param arrayIndex, this is not same as ProcessorId
- * @param msg
+ * @param arrayIndex this is not same as ProcessorId
*/
- def output(arrayIndex: Int, msg: Message) : Unit = {
+ def output(arrayIndex: Int, msg: Message): Unit = {
var count = 0
- count += this.subscriptions(arrayIndex)._2.sendMessage(msg)
+ count += this.subscriptions(arrayIndex)._2.sendMessage(msg)
sendThroughput.mark(count)
}
- def output(msg : Message) : Unit = {
+ def output(msg: Message): Unit = {
var count = 0
- this.subscriptions.foreach{ subscription =>
+ this.subscriptions.foreach { subscription =>
count += subscription._2.sendMessage(msg)
}
sendThroughput.mark(count)
}
- final override def postStop() : Unit = {
+ final override def postStop(): Unit = {
onStop()
}
- final override def preStart() : Unit = {
+ final override def preStart(): Unit = {
val register = RegisterTask(taskId, executorId, local)
LOG.info(s"$register")
executor ! register
@@ -154,7 +156,7 @@ class TaskActor(
msg match {
case SendAck(ack, targetTask) =>
transport(ack, targetTask)
- case m : Message =>
+ case m: Message =>
count += 1
onNext(m)
case other =>
@@ -172,18 +174,18 @@ class TaskActor(
}
}
- private def onStartClock: Unit = {
+ private def onStartClock(): Unit = {
LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId")
subscriptions = subscribers.map { subscriber =>
- (subscriber.processorId ,
+ (subscriber.processorId,
new Subscription(appId, executorId, taskId, subscriber, sessionId, this,
maxPendingMessageCount, ackOnceEveryMessageCount))
}.sortBy(_._1)
- subscriptions.foreach(_._2.start)
+ subscriptions.foreach(_._2.start())
import scala.collection.JavaConverters._
- stashQueue.asScala.foreach{item =>
+ stashQueue.asScala.foreach { item =>
handleMessages(item.sender).apply(item.msg)
}
stashQueue.clear()
@@ -198,7 +200,7 @@ class TaskActor(
}
def waitForTaskRegistered: Receive = {
- case start@ TaskRegistered(_, sessionId, startClock) =>
+ case start@TaskRegistered(_, sessionId, startClock) =>
this.sessionId = sessionId
this.upstreamMinClock = startClock
context.become(waitForStartClock)
@@ -206,9 +208,9 @@ class TaskActor(
private val stashQueue = new util.LinkedList[MessageAndSender]()
- def waitForStartClock : Receive = {
+ def waitForStartClock: Receive = {
case start: StartTask =>
- onStartClock
+ onStartClock()
case other: AnyRef =>
stashQueue.add(MessageAndSender(other, sender()))
}
@@ -221,8 +223,9 @@ class TaskActor(
doHandleMessage()
}
case ackRequest: AckRequest =>
- //enqueue to handle the ackRequest and send back ack later
- val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, ackOnceEveryMessageCount)
+ // Enqueue to handle the ackRequest and send back ack later
+ val ackResponse = securityChecker.generateAckResponse(ackRequest, sender,
+ ackOnceEveryMessageCount)
if (null != ackResponse) {
queue.add(SendAck(ackResponse, ackRequest.taskId))
doHandleMessage()
@@ -231,16 +234,17 @@ class TaskActor(
subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack))
doHandleMessage()
case inputMessage: SerializedMessage =>
- val message = Message(serializerPool.get().deserialize(inputMessage.bytes), inputMessage.timeStamp)
+ val message = Message(serializerPool.get().deserialize(inputMessage.bytes),
+ inputMessage.timeStamp)
receiveMessage(message, sender)
case inputMessage: Message =>
receiveMessage(inputMessage, sender)
- case upstream@ UpstreamMinClock(upstreamClock) =>
+ case upstream@UpstreamMinClock(upstreamClock) =>
this.upstreamMinClock = upstreamClock
val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
val subMin = sub._2.minClock
- // a subscription is holding back the _minClock;
+ // A subscription is holding back the _minClock;
// we send AckRequest to its tasks to push _minClock forward
if (subMin == _minClock) {
sub._2.sendAckRequestOnStallingTime(_minClock)
@@ -255,7 +259,7 @@ class TaskActor(
appMaster ! update
}
- // check whether current task is dead.
+ // Checks whether current task is dead.
if (_minClock > life.death) {
// There will be no more message received...
val unRegister = UnRegisterTask(taskId, executorId)
@@ -273,11 +277,11 @@ class TaskActor(
case Some(subscription) =>
subscription.changeLife(subscriber.lifeTime cross this.life)
case None =>
- val subscription = new Subscription(appId, executorId, taskId, subscriber, sessionId, this,
- maxPendingMessageCount, ackOnceEveryMessageCount)
- subscription.start
- subscriptions :+= (subscriber.processorId, subscription)
- // sort, keep the order
+ val subscription = new Subscription(appId, executorId, taskId, subscriber,
+ sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)
+ subscription.start()
+ subscriptions :+=(subscriber.processorId, subscription)
+ // Sorting, keep the order
subscriptions = subscriptions.sortBy(_._1)
}
}
@@ -293,12 +297,12 @@ class TaskActor(
}
/**
- * @return min clock of this task
+ * Returns min clock of this task
*/
def minClock: TimeStamp = _minClock
/**
- * @return min clock of upstream task
+ * Returns min clock of upstream task
*/
def getUpstreamMinClock: TimeStamp = upstreamMinClock
@@ -309,8 +313,8 @@ class TaskActor(
queue.add(msg)
doHandleMessage()
case None =>
- //Todo: Indicate the error and avoid the LOG flood
- //LOG.error(s"Task $taskId drop message $msg")
+ // TODO: Indicate the error and avoid the LOG flood
+ // LOG.error(s"Task $taskId drop message $msg")
}
}
@@ -320,28 +324,28 @@ class TaskActor(
}
object TaskActor {
- val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000 //3 seconds
+ // 3 seconds
+ val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000
// If the message comes from an unknown source, securityChecker will drop it
- class SecurityChecker(task_id: TaskId, self : ActorRef) {
+ class SecurityChecker(task_id: TaskId, self: ActorRef) {
private val LOG: Logger = LogUtil.getLogger(getClass, task = task_id)
- // Use mutable HashMap for performance optimization
+ // Uses mutable HashMap for performance optimization
private val receivedMsgCount = new IntShortHashMap()
-
// Tricky performance optimization to save memory.
// We store the session Id in the uid of ActorPath
// ActorPath.hashCode is same as uid.
private def getSessionId(actor: ActorRef): Int = {
- //TODO: As method uid is protected in [akka] package. We
+ // TODO: As method uid is protected in [akka] package. We
// are using hashCode instead of uid.
actor.hashCode()
}
def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = {
- LOG.debug(s"Handle InitialAckRequest for session $ackRequest" )
+ LOG.debug(s"Handle InitialAckRequest for session $ackRequest")
val sessionId = ackRequest.sessionId
if (sessionId == NONE_SESSION) {
LOG.error(s"SessionId is not initialized, ackRequest: $ackRequest")
@@ -355,7 +359,7 @@ object TaskActor {
def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = {
val sessionId = ackRequest.sessionId
if (receivedMsgCount.containsKey(sessionId)) {
- // we increment more count for each AckRequest
+ // Increments more count for each AckRequest
// to throttle the number of unacked AckRequest
receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort)
Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId)
@@ -366,8 +370,8 @@ object TaskActor {
}
// If the message comes from an unknown source, then drop it
- def checkMessage(message : Message, sender: ActorRef): Option[Message] = {
- if(sender.equals(self)){
+ def checkMessage(message: Message, sender: ActorRef): Option[Message] = {
+ if (sender.equals(self)) {
Some(message)
} else {
val sessionId = getSessionId(sender)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
index ce32606..28605cf 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
@@ -19,13 +19,14 @@
package io.gearpump.streaming.task
import akka.actor.ActorRef
+
import io.gearpump.streaming.LifeTime
case class TaskContextData(
- executorId : Int,
- appId : Int,
+ executorId: Int,
+ appId: Int,
appName: String,
- appMaster : ActorRef,
+ appMaster: ActorRef,
parallelism: Int,
life: LifeTime,
subscribers: List[Subscriber])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
index 59babff..d89387a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskControlMessage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -32,9 +32,11 @@ case class InitialAckRequest(taskId: TaskId, sessionId: Int)
*/
case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
-/*
- Here the seq field represents the expected number of received messages
- and the actualReceivedNum field means the actual received number since start
+/**
+ * Ack back to sender task actor.
+ *
+ * @param seq The seq field represents the expected number of received messages and the
+ * actualReceivedNum field means the actual received number since start.
*/
case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int)
@@ -60,6 +62,7 @@ case object GetStartClock
case class StartClock(clock: TimeStamp)
+/** Probe the latency between two upstream to downstream tasks. */
case class LatencyProbe(timestamp: Long)
case class SendMessageLoss()
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
index f599540..66c3c52 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskId.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,9 +20,9 @@ package io.gearpump.streaming.task
import io.gearpump.streaming._
-case class TaskId(processorId : ProcessorId, index : TaskIndex)
+case class TaskId(processorId: ProcessorId, index: TaskIndex)
object TaskId {
- def toLong(id : TaskId) = (id.processorId.toLong << 32) + id.index
- def fromLong(id : Long) = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt)
+ def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index
+ def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id & 0xFFFFFFFF).toInt)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
index 370eef7..500f8b3 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskMessageSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,8 +19,6 @@ package io.gearpump.streaming.task
import java.io.{DataInput, DataOutput}
-import org.jboss.netty.buffer.ChannelBuffer
-
trait TaskMessageSerializer[T] {
def write(dataOutput: DataOutput, obj: T)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
index c6564ff..040fc2e 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
@@ -21,11 +21,12 @@ package io.gearpump.streaming.task
object TaskUtil {
/**
- * Resolve a classname to a Task class.
+ * Resolves a classname to a Task class.
+ *
* @param className the class name to resolve
* @return resolved class
*/
- def loadClass(className: String): Class[_<:Task] = {
+ def loadClass(className: String): Class[_ <: Task] = {
val loader = Thread.currentThread().getContextClassLoader()
loader.loadClass(className).asSubclass(classOf[Task])
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
index 3be396f..e7e883c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskWrapper.scala
@@ -18,22 +18,26 @@
package io.gearpump.streaming.task
+import scala.concurrent.duration.FiniteDuration
+
import akka.actor.Actor._
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import io.gearpump.{TimeStamp, Message}
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.LogUtil
import org.slf4j.Logger
-import scala.concurrent.duration.FiniteDuration
+import io.gearpump.cluster.UserConfig
+import io.gearpump.util.LogUtil
+import io.gearpump.{Message, TimeStamp}
/**
* This provides TaskContext for user defined tasks
+ *
* @param taskClass task class
* @param context context class
* @param userConf user config
*/
-class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData, userConf: UserConfig) extends TaskContext with TaskInterface {
+class TaskWrapper(
+ val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData,
+ userConf: UserConfig) extends TaskContext with TaskInterface {
private val LOG = LogUtil.getLogger(taskClass, task = taskId)
@@ -56,18 +60,19 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task
override def output(msg: Message): Unit = actor.output(msg)
/**
- * @see [[TaskActor.output]]
+ * See [[io.gearpump.streaming.task.TaskActor]] output(arrayIndex: Int, msg: Message): Unit
+ *
* @param index, not same as ProcessorId
- * @param msg
*/
def output(index: Int, msg: Message): Unit = actor.output(index, msg)
/**
* Use with caution, output unmanaged message to target tasks
+ *
* @param msg message to output
* @param tasks the tasks to output to
*/
- def outputUnManaged(msg: AnyRef, tasks: TaskId *): Unit = {
+ def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = {
actor.transport(msg, tasks: _*)
}
@@ -105,12 +110,12 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task
actor.getUpstreamMinClock
}
- def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit): Cancellable = {
+ def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit): Cancellable = {
val dispatcher = actor.context.system.dispatcher
actor.context.system.scheduler.schedule(initialDelay, interval)(f)(dispatcher)
}
- def scheduleOnce(initialDelay: FiniteDuration)(f: ⇒ Unit): Cancellable = {
+ def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = {
val dispatcher = actor.context.system.dispatcher
actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher)
}
@@ -121,9 +126,8 @@ class TaskWrapper(val taskId: TaskId, taskClass: Class[_ <: Task], context: Task
}
/**
- * logger is environment dependant, it should be provided by
+ * Logger is environment dependant, it should be provided by
* containing environment.
- * @return
*/
override def logger: Logger = LOG
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
index 7dd08fb..f3894ea 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,9 +18,9 @@
package io.gearpump.streaming.transaction.api
-import io.gearpump.streaming.task.TaskContext
import io.gearpump.TimeStamp
import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.TaskContext
/**
* CheckpointStore persistently stores mapping of timestamp to checkpoint