You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:42 UTC

[40/50] incubator-gearpump git commit: fix #1975, fix storm integration test

fix #1975,  fix storm integration test


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/920ad262
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/920ad262
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/920ad262

Branch: refs/heads/master
Commit: 920ad262f9f9e88611d520df9b40d1ff8363f81c
Parents: 42806a9
Author: manuzhang <ow...@gmail.com>
Authored: Tue Feb 2 16:58:00 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Feb 24 16:37:01 2016 +0800

----------------------------------------------------------------------
 .../experiments/storm/main/GearpumpNimbus.scala | 27 ++++++----
 .../storm/main/GearpumpStormClient.scala        |  2 +-
 .../checklist/StormCompatibilitySpec.scala      | 51 +++++++++++--------
 .../suites/StandaloneModeSuite.scala            |  4 +-
 .../integrationtest/storm/StormClient.scala     | 52 +++++++++-----------
 project/Build.scala                             |  3 +-
 .../streaming/appmaster/ClockService.scala      |  2 -
 7 files changed, 74 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 6d5b0aa..e9973e5 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -39,14 +39,15 @@ import io.gearpump.streaming.StreamApplication
 import io.gearpump.util.{AkkaApp, Constants, LogUtil}
 import org.apache.storm.shade.org.json.simple.JSONValue
 import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
-import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
 import org.slf4j.Logger
 
 import scala.collection.JavaConverters._
+import scala.concurrent.Future
 
 object GearpumpNimbus extends AkkaApp with ArgumentsParser {
   private val THRIFT_PORT = StormUtil.getThriftPort
   private val OUTPUT = "output"
+  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
 
   override val options: Array[(String, CLIOption[Any])] = Array(
     OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml"))
@@ -57,15 +58,20 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
     val output = parsed.getString(OUTPUT)
     val akkaConf = updateClientConfig(inputAkkaConf)
     val system = ActorSystem("storm", akkaConf)
+
     val clientContext = new ClientContext(akkaConf, system, null)
     val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
-    val thriftConf: JMap[String, String] = Map(
-      Config.NIMBUS_HOST -> ("" + akkaConf.getString(Constants.GEARPUMP_HOSTNAME)),
-      Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava
+    val thriftConf: JMap[AnyRef, AnyRef] = Map(
+      Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),
+      Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]]
     updateStormConfig(thriftConf, output)
     stormConf.putAll(thriftConf)
-    val thriftServer = createServer(clientContext, stormConf)
-    thriftServer.serve()
+
+    import scala.concurrent.ExecutionContext.Implicits.global
+    Future {
+      val thriftServer = createServer(clientContext, stormConf)
+      thriftServer.serve()
+    }
     system.awaitTermination()
   }
 
@@ -75,12 +81,12 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
     new ThriftServer(stormConf, processor, connectionType)
   }
 
-  private def updateStormConfig(thriftConfig: JMap[String, String], output: String): Unit = {
+  private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: String): Unit = {
     val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
     val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]]
     updatedConfig.putAll(outputConfig)
     updatedConfig.putAll(thriftConfig)
-    val yaml = new Yaml(new SafeConstructor)
+    val yaml = new Yaml
     val serialized = yaml.dumpAsMap(updatedConfig)
     val writer = new FileWriter(new File(output))
     try {
@@ -112,7 +118,8 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
 }
 
 class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface {
-  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
+  import io.gearpump.experiments.storm.main.GearpumpNimbus._
+
   private var applications = Map.empty[String, Int]
   private var topologies = Map.empty[String, TopologyData]
   private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
@@ -128,7 +135,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
   }
 
   override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
-
+    LOG.info(s"Submitted topology $name")
     implicit val system = clientContext.system
     val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
     val stormConfig = gearpumpStormTopology.getStormConfig

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
index 643e926..d4c0e8a 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -46,7 +46,7 @@ object GearpumpStormClient extends AkkaApp with ArgumentsParser {
     val stormOptions = Array(
       s"-Dstorm.options=${getThriftOptions(stormConfig)}",
       s"-Dstorm.jar=$jar",
-      s"-Dstorm.config.file=$stormConfig",
+      s"-Dstorm.conf.file=$stormConfig",
       s"-D${PREFER_IPV4}=true"
     )
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
index 30c6d57..3baabf6 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
@@ -26,7 +26,7 @@ import io.gearpump.integrationtest.{TestSpecBase, Util}
   */
 class StormCompatibilitySpec extends TestSpecBase {
 
-  private lazy val stormClient = new StormClient(cluster.getMastersAddresses)
+  private lazy val stormClient = new StormClient(cluster.getMastersAddresses, restClient)
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -74,27 +74,33 @@ class StormCompatibilitySpec extends TestSpecBase {
 
   "Storm over Gearpump" should withStorm {
     stormVersion =>
-      s"support basic topologies ($stormVersion)" in {
+     s"support basic topologies ($stormVersion)" in {
         val stormJar = getStormJar(stormVersion)
         val topologyName = getTopologyName("exclamation", stormVersion)
+
         // exercise
-        val appId = stormClient.submitStormApp(stormJar,
-          mainClass = "storm.starter.ExclamationTopology", args = topologyName)
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.ExclamationTopology",
+          args = topologyName,
+          appName = topologyName)
 
         // verify
-        val actual = expectAppIsRunning(appId, topologyName)
-        Util.retryUntil(restClient.queryStreamingAppDetail(actual.appId).clock > 0)
+        Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
       }
 
       s"support to run a python version of wordcount ($stormVersion)" in {
         val stormJar = getStormJar(stormVersion)
         val topologyName = getTopologyName("wordcount", stormVersion)
+
         // exercise
-        val appId = stormClient.submitStormApp(stormJar,
-          mainClass = "storm.starter.WordCountTopology", args = topologyName)
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.WordCountTopology",
+          args = topologyName,
+          appName = topologyName)
 
         // verify
-        expectAppIsRunning(appId, topologyName)
         Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
       }
 
@@ -104,8 +110,11 @@ class StormCompatibilitySpec extends TestSpecBase {
         // input (user and follower) data are already prepared in memory
         val stormJar = getStormJar(stormVersion)
         val topologyName = getTopologyName("reach", stormVersion)
-        stormClient.submitStormApp(stormJar,
-          mainClass = "storm.starter.ReachTopology", args = topologyName)
+        stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.ReachTopology",
+          args = topologyName,
+          appName = topologyName)
         val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
 
         // verify
@@ -119,13 +128,16 @@ class StormCompatibilitySpec extends TestSpecBase {
       s"support tick tuple ($stormVersion)" in {
         val stormJar = getStormJar(stormVersion)
         val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
+
         // exercise
-        val appId = stormClient.submitStormApp(stormJar,
-          mainClass = "storm.starter.RollingTopWords", args = s"$topologyName remote")
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.RollingTopWords",
+          args = s"$topologyName remote",
+          appName = topologyName)
 
         // verify
-        val actual = expectAppIsRunning(appId, topologyName)
-        Util.retryUntil(restClient.queryStreamingAppDetail(actual.appId).clock > 0)
+        Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
       }
 
       s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in {
@@ -142,8 +154,6 @@ class StormCompatibilitySpec extends TestSpecBase {
             val brokerList = kafkaCluster.getBrokerListConnectString
             val sourceTopic = "topic1"
             val sinkTopic = "topic2"
-            val appsCount = restClient.listApps().length
-            val appId = appsCount + 1
 
             val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic,
               "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList,
@@ -155,11 +165,12 @@ class StormCompatibilitySpec extends TestSpecBase {
             // generate number sequence (1, 2, 3, ...) to the topic
             withDataProducer(sourceTopic, brokerList) { producer =>
 
-              stormClient.submitStormApp(stormJar,
+              val appId = stormClient.submitStormApp(
+                jar = stormJar,
                 mainClass = stormKafkaTopology,
-                args = args.mkString(" "))
+                args = args.mkString(" "),
+                appName = topologyName)
 
-              expectAppIsRunning(appId, topologyName)
               Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
 
               // kill executor and verify at-least-once is guaranteed on application restart

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
index 944f542..c7a2c3e 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
@@ -32,8 +32,8 @@ class StandaloneModeSuite extends Suites(
   new RestServiceSpec,
   new ExampleSpec,
   new DynamicDagSpec,
-  new StabilitySpec/*,
-  new StormCompatibilitySpec*/
+  new StabilitySpec,
+  new StormCompatibilitySpec
 
 ) with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
index 15acf4e..a018280 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
@@ -18,46 +18,37 @@
 
 package io.gearpump.integrationtest.storm
 
-
 import backtype.storm.utils.{Utils, DRPCClient}
-import io.gearpump.integrationtest.Docker
-import io.gearpump.integrationtest.minicluster.BaseContainer
-import org.apache.log4j.Logger
-
+import io.gearpump.integrationtest.{Util, Docker}
+import io.gearpump.integrationtest.minicluster.{RestClient, BaseContainer}
 
-class StormClient(masterAddrs: Seq[(String, Int)]) {
+class StormClient(masterAddrs: Seq[(String, Int)], restClient: RestClient) {
 
-  private val LOG = Logger.getLogger(getClass)
-  private val STORM_HOST = "storm0"
-  private val STORM_NIMBUS = "/opt/start storm nimbus"
-  private val STORM_APP = "/opt/start storm app"
-  private val STORM_DRPC = "storm-drpc"
-  private val CONFIG_FILE = "storm.yaml"
+  private val CONFIG_FILE = "/opt/gearpump/storm.yaml"
+  private val DRPC_HOST = "storm0"
   private val DRPC_PORT = 3772
   private val DRPC_INVOCATIONS_PORT = 3773
+  private val STORM_DRPC = "storm-drpc"
+  private val NIMBUS_HOST = "storm1"
+  private val STORM_NIMBUS = "storm nimbus"
+  private val STORM_APP = "/opt/start storm app"
 
-  private val container = new BaseContainer(STORM_HOST, STORM_DRPC, masterAddrs,
+  private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
     tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
+  private val nimbusContainer = new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
 
   def start(): Unit = {
-    container.createAndStart()
-    startNimbus
-  }
-
-  private def startNimbus: String = {
-    Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE")
+    drpcContainer.createAndStart()
+    nimbusContainer.createAndStart()
   }
 
-  def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = {
-    try {
-      Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config $CONFIG_FILE " +
-        s"-jar $jar $mainClass $args").split("\n")
-        .filter(_.contains("The application id is ")).head.split(" ").last.toInt
-    } catch {
-      case ex: Throwable =>
-        LOG.warn(s"swallowed an exception: $ex")
-        -1
-    }
+  def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
+    Util.retryUntil({
+      Docker.exec(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
+        s"-jar $jar $mainClass $args")
+      restClient.listRunningApps().exists(_.appName == appName)
+    })
+    restClient.listRunningApps().filter(_.appName == appName).head.appId
   }
 
   def getDRPCClient(drpcServerIp: String): DRPCClient = {
@@ -66,7 +57,8 @@ class StormClient(masterAddrs: Seq[(String, Int)]) {
   }
 
   def shutDown(): Unit = {
-    container.killAndRemove()
+    drpcContainer.killAndRemove()
+    nimbusContainer.killAndRemove()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 830e7c2..3f91f1e 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -385,8 +385,7 @@ object Build extends sbt.Build {
           "commons-httpclient" % "commons-httpclient" % commonsHttpVersion,
           "org.apache.hadoop" % "hadoop-mapreduce-client-core" % clouderaVersion,
           "org.apache.hadoop" % "hadoop-yarn-server-resourcemanager" % clouderaVersion % "provided",
-          "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % clouderaVersion % "provided",
-          "org.specs2" %% "specs2-mock" % "3.6.4" % "test"
+          "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % clouderaVersion % "provided"
         )
       )
   ) dependsOn(services % "test->test;compile->compile", core % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
index 578a15d..67fd592 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
@@ -348,8 +348,6 @@ object ClockService {
     def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
       taskClocks(taskIndex) = clock
       _min = Longs.min(taskClocks: _*)
-
-      Console.print(s"min clock of $processorId is ${_min}")
     }
   }