You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/01/14 11:57:06 UTC

incubator-gearpump git commit: [GEARPUMP-262] Invoke setup/teardown methods of SingleInputFunction i…

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master a23a40f5e -> 636cd6f8e


[GEARPUMP-262] Invoke setup/teardown methods of SingleInputFunction i\u2026

\u2026n all tasks

Author: manuzhang <ow...@gmail.com>

Closes #132 from manuzhang/setup_teardown.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/636cd6f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/636cd6f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/636cd6f8

Branch: refs/heads/master
Commit: 636cd6f8ef566260932848d2cc1b6c77fd8c90b3
Parents: a23a40f
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 19:56:17 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 19:56:29 2017 +0800

----------------------------------------------------------------------
 .../streaming/dsl/task/TransformTask.scala      | 10 +++
 .../streaming/source/DataSourceTask.scala       |  2 +
 .../streaming/dsl/task/TransformTaskSpec.scala  | 75 ++++++++++++++++++++
 .../streaming/source/DataSourceTaskSpec.scala   | 12 +++-
 4 files changed, 96 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index c13a4fb..f8fbefa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming.dsl.task
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
@@ -31,6 +33,10 @@ class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
       GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
   }
 
+  override def onStart(startTime: Instant): Unit = {
+    operator.foreach(_.setup())
+  }
+
   override def onNext(msg: Message): Unit = {
     val time = msg.timestamp
 
@@ -43,4 +49,8 @@ class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
         taskContext.output(new Message(msg.msg, time))
     }
   }
+
+  override def onStop(): Unit = {
+    operator.foreach(_.teardown())
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 535497c..450f2d6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -69,6 +69,7 @@ class DataSourceTask[IN, OUT] private[source](
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
+    operator.foreach(_.setup())
 
     self ! Watermark(source.getWatermark)
   }
@@ -82,6 +83,7 @@ class DataSourceTask[IN, OUT] private[source](
   }
 
   override def onStop(): Unit = {
+    operator.foreach(_.teardown())
     LOG.info("closing data source...")
     source.close()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
new file mode 100644
index 0000000..b6e7342
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.mockito.Mockito.{verify, when}
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("TransformTask.onStart should call SingleInputFunction.setup") {
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
+      val taskContext = MockUtil.mockTaskContext
+      implicit val system = MockUtil.system
+      val config = UserConfig.empty
+      val operator = mock[SingleInputFunction[Any, Any]]
+      val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config)
+
+      sourceTask.onStart(startTime)
+
+      verify(operator).setup()
+    }
+  }
+
+  property("TransformTask.onNext should call SingleInputFunction.process") {
+    forAll(Gen.alphaStr) { (str: String) =>
+      val taskContext = MockUtil.mockTaskContext
+      implicit val system = MockUtil.system
+      val config = UserConfig.empty
+      val operator = mock[SingleInputFunction[Any, Any]]
+      val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+      val msg = Message(str)
+      when(operator.process(str)).thenReturn(Some(str))
+
+      task.onNext(msg)
+
+      verify(taskContext).output(msg)
+    }
+  }
+
+  property("DataSourceTask.onStop should call SingleInputFunction.setup") {
+    val taskContext = MockUtil.mockTaskContext
+    implicit val system = MockUtil.system
+    val config = UserConfig.empty
+    val operator = mock[SingleInputFunction[Any, Any]]
+    val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+
+    task.onStop()
+
+    verify(operator).teardown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index c786047..4e95bdd 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,6 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -38,11 +39,13 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
+      val operator = mock[SingleInputFunction[Any, Any]]
+      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
 
       sourceTask.onStart(startTime)
+
       verify(dataSource).open(taskContext, startTime)
+      verify(operator).setup()
     }
   }
 
@@ -69,9 +72,12 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
     val dataSource = mock[DataSource]
     val config = UserConfig.empty
       .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None)
+    val operator = mock[SingleInputFunction[Any, Any]]
+    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator))
 
     sourceTask.onStop()
+
     verify(dataSource).close()
+    verify(operator).teardown()
   }
 }