You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/09/05 07:39:14 UTC

incubator-gearpump git commit: [GEARPUMP-201] integration test failure

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master a743a9ca6 -> 5bf7c7cb6


[GEARPUMP-201]  integration test failure

Author: manuzhang <ow...@gmail.com>

Closes #79 from manuzhang/fix_it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5bf7c7cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5bf7c7cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5bf7c7cb

Branch: refs/heads/master
Commit: 5bf7c7cb606aca8c8ffc93375b8c6316f8f6624c
Parents: a743a9c
Author: manuzhang <ow...@gmail.com>
Authored: Mon Sep 5 15:39:04 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Sep 5 15:39:04 2016 +0800

----------------------------------------------------------------------
 .../streaming/examples/wordcount/Split.scala    | 14 +++------
 .../examples/wordcount/WordCount.scala          | 12 +++----
 .../examples/wordcount/WordCountSpec.scala      |  2 +-
 .../checklist/DynamicDagSpec.scala              | 33 ++++++++++----------
 4 files changed, 27 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index ad6f41a..c07e124 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -29,14 +29,12 @@ import scala.collection.mutable.ArrayBuffer
 
 class Split extends DataSource {
 
-  val result = ArrayBuffer[Message]()
-  var item = -1
+  private val result = ArrayBuffer[String]()
+  private var item = -1
   Split.TEXT_TO_SPLIT.lines.foreach { line =>
     line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
-      result.append(new Message(msg, System.currentTimeMillis()))
-
+      result.append(msg)
     }
-
   }
 
   override def open(context: TaskContext, startTime: Instant): Unit = {}
@@ -45,10 +43,10 @@ class Split extends DataSource {
 
     if (item < result.size - 1) {
       item += 1
-      result(item)
+      Message(result(item), System.currentTimeMillis())
     } else {
       item = 0
-      result(item)
+      Message(result(item), System.currentTimeMillis())
     }
 
   }
@@ -57,10 +55,8 @@ class Split extends DataSource {
 
   override def getWatermark: Instant = Instant.now()
 
-
 }
 
-
 object Split {
   val TEXT_TO_SPLIT =
     """

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 99b83ad..9580e63 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -36,7 +36,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
   val RUN_FOR_EVER = -1
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<how many source tasks>", required = false,
+    "split" -> CLIOption[Int]("<how many source tasks>", required = false,
       defaultValue = Some(1)),
     "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
     "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)),
@@ -48,14 +48,12 @@ object WordCount extends AkkaApp with ArgumentsParser {
     implicit val actorSystem = system
 
     val sumNum = config.getInt("sum")
-    val sourceNum = config.getInt("source")
-    val source = new Split
-    val sourceProcessor = DataSourceProcessor(source, sourceNum)
+    val splitNum = config.getInt("split")
+    val split = new Split
+    val sourceProcessor = DataSourceProcessor(split, splitNum, "Split")
     val sum = Processor[Sum](sumNum)
     val partitioner = new HashPartitioner
-    val computation = sourceProcessor ~ partitioner ~>
-      sum
-
+    val computation = sourceProcessor ~ partitioner ~> sum
     val app = StreamApplication("wordCount", Graph(computation), UserConfig.empty)
     app
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
index 5121815..f703552 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
@@ -44,7 +44,7 @@ class WordCountSpec
   property("WordCount should succeed to submit application with required arguments") {
     val requiredArgs = Array.empty[String]
     val optionalArgs = Array(
-      "-source", "1",
+      "-split", "1",
       "-sum", "1")
 
     val args = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
index 81e7b2a..89b8ef7 100644
--- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -24,10 +24,9 @@ import org.apache.gearpump.streaming.appmaster.ProcessorSummary
 
 class DynamicDagSpec extends TestSpecBase {
 
+  val sourceTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProducer"
+  val sinkTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProcessor"
   lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
-  val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
-  val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
-  val solName = "sol"
 
   "dynamic dag" should {
     "can retrieve a list of built-in partitioner classes" in {
@@ -42,13 +41,13 @@ class DynamicDagSpec extends TestSpecBase {
       // todo: blocked by #1450
     }
 
-    "can replace downstream with wordcount's sum processor (new processor will have metrics)" in {
+    "can replace downstream with SOLStreamProcessor (new processor will have metrics)" in {
       // setup
-      val appId = expectSolJarSubmittedWithAppId()
+      val appId = expectWordCountJarSubmittedWithAppId()
 
       // exercise
       val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
+      replaceProcessor(appId, 1, sinkTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
       Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -57,13 +56,13 @@ class DynamicDagSpec extends TestSpecBase {
       processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
     }
 
-    "can replace upstream with wordcount's split processor (new processor will have metrics)" in {
+    "can replace upstream with SOLStreamProducer (new processor will have metrics)" in {
       // setup
-      val appId = expectSolJarSubmittedWithAppId()
+      val appId = expectWordCountJarSubmittedWithAppId()
 
       // exercise
       val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 0, splitTaskClass)
+      replaceProcessor(appId, 0, sourceTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
       Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -74,11 +73,11 @@ class DynamicDagSpec extends TestSpecBase {
 
     "fall back to last dag version when replacing a processor failid" in {
       // setup
-      val appId = expectSolJarSubmittedWithAppId()
+      val appId = expectWordCountJarSubmittedWithAppId()
 
       // exercise
       val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
+      replaceProcessor(appId, 1, sinkTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
       Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -99,12 +98,12 @@ class DynamicDagSpec extends TestSpecBase {
 
     "fall back to last dag version when AppMaster HA triggered" in {
       // setup
-      val appId = expectSolJarSubmittedWithAppId()
+      val appId = expectWordCountJarSubmittedWithAppId()
 
       // exercise
       val formerAppMaster = restClient.queryApp(appId).appMasterPath
       val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
+      replaceProcessor(appId, 1, sinkTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
       Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -120,11 +119,11 @@ class DynamicDagSpec extends TestSpecBase {
     }
   }
 
-  private def expectSolJarSubmittedWithAppId(): Int = {
+  private def expectWordCountJarSubmittedWithAppId(): Int = {
     val appId = restClient.getNextAvailableAppId()
-    val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
+    val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
     success shouldBe true
-    expectAppIsRunning(appId, solName)
+    expectAppIsRunning(appId, wordCountName)
     Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
     appId
   }
@@ -135,7 +134,7 @@ class DynamicDagSpec extends TestSpecBase {
       newTaskClass: String,
       newProcessorDescription: String = "",
       newParallelism: Int = 1): Unit = {
-    val uploadedJar = restClient.uploadJar(wordCountJar)
+    val uploadedJar = restClient.uploadJar(solJar)
     val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
       newParallelism, newProcessorDescription,
       jar = uploadedJar)