You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/09/05 07:39:14 UTC
incubator-gearpump git commit: [GEARPUMP-201] integration test failure
Repository: incubator-gearpump
Updated Branches:
refs/heads/master a743a9ca6 -> 5bf7c7cb6
[GEARPUMP-201] integration test failure
Author: manuzhang <ow...@gmail.com>
Closes #79 from manuzhang/fix_it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5bf7c7cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5bf7c7cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5bf7c7cb
Branch: refs/heads/master
Commit: 5bf7c7cb606aca8c8ffc93375b8c6316f8f6624c
Parents: a743a9c
Author: manuzhang <ow...@gmail.com>
Authored: Mon Sep 5 15:39:04 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Sep 5 15:39:04 2016 +0800
----------------------------------------------------------------------
.../streaming/examples/wordcount/Split.scala | 14 +++------
.../examples/wordcount/WordCount.scala | 12 +++----
.../examples/wordcount/WordCountSpec.scala | 2 +-
.../checklist/DynamicDagSpec.scala | 33 ++++++++++----------
4 files changed, 27 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index ad6f41a..c07e124 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -29,14 +29,12 @@ import scala.collection.mutable.ArrayBuffer
class Split extends DataSource {
- val result = ArrayBuffer[Message]()
- var item = -1
+ private val result = ArrayBuffer[String]()
+ private var item = -1
Split.TEXT_TO_SPLIT.lines.foreach { line =>
line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
- result.append(new Message(msg, System.currentTimeMillis()))
-
+ result.append(msg)
}
-
}
override def open(context: TaskContext, startTime: Instant): Unit = {}
@@ -45,10 +43,10 @@ class Split extends DataSource {
if (item < result.size - 1) {
item += 1
- result(item)
+ Message(result(item), System.currentTimeMillis())
} else {
item = 0
- result(item)
+ Message(result(item), System.currentTimeMillis())
}
}
@@ -57,10 +55,8 @@ class Split extends DataSource {
override def getWatermark: Instant = Instant.now()
-
}
-
object Split {
val TEXT_TO_SPLIT =
"""
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 99b83ad..9580e63 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -36,7 +36,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
val RUN_FOR_EVER = -1
override val options: Array[(String, CLIOption[Any])] = Array(
- "source" -> CLIOption[Int]("<how many source tasks>", required = false,
+ "split" -> CLIOption[Int]("<how many source tasks>", required = false,
defaultValue = Some(1)),
"sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
"debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)),
@@ -48,14 +48,12 @@ object WordCount extends AkkaApp with ArgumentsParser {
implicit val actorSystem = system
val sumNum = config.getInt("sum")
- val sourceNum = config.getInt("source")
- val source = new Split
- val sourceProcessor = DataSourceProcessor(source, sourceNum)
+ val splitNum = config.getInt("split")
+ val split = new Split
+ val sourceProcessor = DataSourceProcessor(split, splitNum, "Split")
val sum = Processor[Sum](sumNum)
val partitioner = new HashPartitioner
- val computation = sourceProcessor ~ partitioner ~>
- sum
-
+ val computation = sourceProcessor ~ partitioner ~> sum
val app = StreamApplication("wordCount", Graph(computation), UserConfig.empty)
app
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
index 5121815..f703552 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
@@ -44,7 +44,7 @@ class WordCountSpec
property("WordCount should succeed to submit application with required arguments") {
val requiredArgs = Array.empty[String]
val optionalArgs = Array(
- "-source", "1",
+ "-split", "1",
"-sum", "1")
val args = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
index 81e7b2a..89b8ef7 100644
--- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -24,10 +24,9 @@ import org.apache.gearpump.streaming.appmaster.ProcessorSummary
class DynamicDagSpec extends TestSpecBase {
+ val sourceTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProducer"
+ val sinkTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProcessor"
lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
- val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
- val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
- val solName = "sol"
"dynamic dag" should {
"can retrieve a list of built-in partitioner classes" in {
@@ -42,13 +41,13 @@ class DynamicDagSpec extends TestSpecBase {
// todo: blocked by #1450
}
- "can replace downstream with wordcount's sum processor (new processor will have metrics)" in {
+ "can replace downstream with SOLStreamProcessor (new processor will have metrics)" in {
// setup
- val appId = expectSolJarSubmittedWithAppId()
+ val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 1, sumTaskClass)
+ replaceProcessor(appId, 1, sinkTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -57,13 +56,13 @@ class DynamicDagSpec extends TestSpecBase {
processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
}
- "can replace upstream with wordcount's split processor (new processor will have metrics)" in {
+ "can replace upstream with SOLStreamProducer (new processor will have metrics)" in {
// setup
- val appId = expectSolJarSubmittedWithAppId()
+ val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 0, splitTaskClass)
+ replaceProcessor(appId, 0, sourceTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -74,11 +73,11 @@ class DynamicDagSpec extends TestSpecBase {
"fall back to last dag version when replacing a processor failid" in {
// setup
- val appId = expectSolJarSubmittedWithAppId()
+ val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 1, sumTaskClass)
+ replaceProcessor(appId, 1, sinkTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -99,12 +98,12 @@ class DynamicDagSpec extends TestSpecBase {
"fall back to last dag version when AppMaster HA triggered" in {
// setup
- val appId = expectSolJarSubmittedWithAppId()
+ val appId = expectWordCountJarSubmittedWithAppId()
// exercise
val formerAppMaster = restClient.queryApp(appId).appMasterPath
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 1, sumTaskClass)
+ replaceProcessor(appId, 1, sinkTaskClass)
var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
Util.retryUntil(() => {
laterProcessors = restClient.queryStreamingAppDetail(appId).processors
@@ -120,11 +119,11 @@ class DynamicDagSpec extends TestSpecBase {
}
}
- private def expectSolJarSubmittedWithAppId(): Int = {
+ private def expectWordCountJarSubmittedWithAppId(): Int = {
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
- expectAppIsRunning(appId, solName)
+ expectAppIsRunning(appId, wordCountName)
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
appId
}
@@ -135,7 +134,7 @@ class DynamicDagSpec extends TestSpecBase {
newTaskClass: String,
newProcessorDescription: String = "",
newParallelism: Int = 1): Unit = {
- val uploadedJar = restClient.uploadJar(wordCountJar)
+ val uploadedJar = restClient.uploadJar(solJar)
val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
newParallelism, newProcessorDescription,
jar = uploadedJar)