You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/01/14 11:57:06 UTC
incubator-gearpump git commit: [GEARPUMP-262] Invoke setup/teardown methods of SingleInputFunction i…
Repository: incubator-gearpump
Updated Branches:
refs/heads/master a23a40f5e -> 636cd6f8e
[GEARPUMP-262] Invoke setup/teardown methods of SingleInputFunction i\u2026
\u2026n all tasks
Author: manuzhang <ow...@gmail.com>
Closes #132 from manuzhang/setup_teardown.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/636cd6f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/636cd6f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/636cd6f8
Branch: refs/heads/master
Commit: 636cd6f8ef566260932848d2cc1b6c77fd8c90b3
Parents: a23a40f
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 19:56:17 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 19:56:29 2017 +0800
----------------------------------------------------------------------
.../streaming/dsl/task/TransformTask.scala | 10 +++
.../streaming/source/DataSourceTask.scala | 2 +
.../streaming/dsl/task/TransformTaskSpec.scala | 75 ++++++++++++++++++++
.../streaming/source/DataSourceTaskSpec.scala | 12 +++-
4 files changed, 96 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index c13a4fb..f8fbefa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -17,6 +17,8 @@
*/
package org.apache.gearpump.streaming.dsl.task
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
@@ -31,6 +33,10 @@ class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
}
+ override def onStart(startTime: Instant): Unit = {
+ operator.foreach(_.setup())
+ }
+
override def onNext(msg: Message): Unit = {
val time = msg.timestamp
@@ -43,4 +49,8 @@ class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
taskContext.output(new Message(msg.msg, time))
}
}
+
+ override def onStop(): Unit = {
+ operator.foreach(_.teardown())
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 535497c..450f2d6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -69,6 +69,7 @@ class DataSourceTask[IN, OUT] private[source](
override def onStart(startTime: Instant): Unit = {
LOG.info(s"opening data source at $startTime")
source.open(context, startTime)
+ operator.foreach(_.setup())
self ! Watermark(source.getWatermark)
}
@@ -82,6 +83,7 @@ class DataSourceTask[IN, OUT] private[source](
}
override def onStop(): Unit = {
+ operator.foreach(_.teardown())
LOG.info("closing data source...")
source.close()
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
new file mode 100644
index 0000000..b6e7342
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.mockito.Mockito.{verify, when}
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("TransformTask.onStart should call SingleInputFunction.setup") {
+ forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
+ val taskContext = MockUtil.mockTaskContext
+ implicit val system = MockUtil.system
+ val config = UserConfig.empty
+ val operator = mock[SingleInputFunction[Any, Any]]
+ val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config)
+
+ sourceTask.onStart(startTime)
+
+ verify(operator).setup()
+ }
+ }
+
+ property("TransformTask.onNext should call SingleInputFunction.process") {
+ forAll(Gen.alphaStr) { (str: String) =>
+ val taskContext = MockUtil.mockTaskContext
+ implicit val system = MockUtil.system
+ val config = UserConfig.empty
+ val operator = mock[SingleInputFunction[Any, Any]]
+ val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+ val msg = Message(str)
+ when(operator.process(str)).thenReturn(Some(str))
+
+ task.onNext(msg)
+
+ verify(taskContext).output(msg)
+ }
+ }
+
+ property("DataSourceTask.onStop should call SingleInputFunction.setup") {
+ val taskContext = MockUtil.mockTaskContext
+ implicit val system = MockUtil.system
+ val config = UserConfig.empty
+ val operator = mock[SingleInputFunction[Any, Any]]
+ val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+
+ task.onStop()
+
+ verify(operator).teardown()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index c786047..4e95bdd 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,6 +23,7 @@ import java.time.Instant
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
@@ -38,11 +39,13 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
+ val operator = mock[SingleInputFunction[Any, Any]]
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
sourceTask.onStart(startTime)
+
verify(dataSource).open(taskContext, startTime)
+ verify(operator).setup()
}
}
@@ -69,9 +72,12 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
+ val operator = mock[SingleInputFunction[Any, Any]]
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
sourceTask.onStop()
+
verify(dataSource).close()
+ verify(operator).teardown()
}
}