You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:42 UTC
[40/50] incubator-gearpump git commit: fix #1975,
fix storm integration test
fix #1975, fix storm integration test
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/920ad262
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/920ad262
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/920ad262
Branch: refs/heads/master
Commit: 920ad262f9f9e88611d520df9b40d1ff8363f81c
Parents: 42806a9
Author: manuzhang <ow...@gmail.com>
Authored: Tue Feb 2 16:58:00 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Feb 24 16:37:01 2016 +0800
----------------------------------------------------------------------
.../experiments/storm/main/GearpumpNimbus.scala | 27 ++++++----
.../storm/main/GearpumpStormClient.scala | 2 +-
.../checklist/StormCompatibilitySpec.scala | 51 +++++++++++--------
.../suites/StandaloneModeSuite.scala | 4 +-
.../integrationtest/storm/StormClient.scala | 52 +++++++++-----------
project/Build.scala | 3 +-
.../streaming/appmaster/ClockService.scala | 2 -
7 files changed, 74 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 6d5b0aa..e9973e5 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -39,14 +39,15 @@ import io.gearpump.streaming.StreamApplication
import io.gearpump.util.{AkkaApp, Constants, LogUtil}
import org.apache.storm.shade.org.json.simple.JSONValue
import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
-import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
import org.slf4j.Logger
import scala.collection.JavaConverters._
+import scala.concurrent.Future
object GearpumpNimbus extends AkkaApp with ArgumentsParser {
private val THRIFT_PORT = StormUtil.getThriftPort
private val OUTPUT = "output"
+ private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
override val options: Array[(String, CLIOption[Any])] = Array(
OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml"))
@@ -57,15 +58,20 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
val output = parsed.getString(OUTPUT)
val akkaConf = updateClientConfig(inputAkkaConf)
val system = ActorSystem("storm", akkaConf)
+
val clientContext = new ClientContext(akkaConf, system, null)
val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
- val thriftConf: JMap[String, String] = Map(
- Config.NIMBUS_HOST -> ("" + akkaConf.getString(Constants.GEARPUMP_HOSTNAME)),
- Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava
+ val thriftConf: JMap[AnyRef, AnyRef] = Map(
+ Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),
+ Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]]
updateStormConfig(thriftConf, output)
stormConf.putAll(thriftConf)
- val thriftServer = createServer(clientContext, stormConf)
- thriftServer.serve()
+
+ import scala.concurrent.ExecutionContext.Implicits.global
+ Future {
+ val thriftServer = createServer(clientContext, stormConf)
+ thriftServer.serve()
+ }
system.awaitTermination()
}
@@ -75,12 +81,12 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
new ThriftServer(stormConf, processor, connectionType)
}
- private def updateStormConfig(thriftConfig: JMap[String, String], output: String): Unit = {
+ private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: String): Unit = {
val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]]
updatedConfig.putAll(outputConfig)
updatedConfig.putAll(thriftConfig)
- val yaml = new Yaml(new SafeConstructor)
+ val yaml = new Yaml
val serialized = yaml.dumpAsMap(updatedConfig)
val writer = new FileWriter(new File(output))
try {
@@ -112,7 +118,8 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
}
class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface {
- private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
+ import io.gearpump.experiments.storm.main.GearpumpNimbus._
+
private var applications = Map.empty[String, Int]
private var topologies = Map.empty[String, TopologyData]
private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
@@ -128,7 +135,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
}
override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
-
+ LOG.info(s"Submitted topology $name")
implicit val system = clientContext.system
val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
val stormConfig = gearpumpStormTopology.getStormConfig
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
index 643e926..d4c0e8a 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -46,7 +46,7 @@ object GearpumpStormClient extends AkkaApp with ArgumentsParser {
val stormOptions = Array(
s"-Dstorm.options=${getThriftOptions(stormConfig)}",
s"-Dstorm.jar=$jar",
- s"-Dstorm.config.file=$stormConfig",
+ s"-Dstorm.conf.file=$stormConfig",
s"-D${PREFER_IPV4}=true"
)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
index 30c6d57..3baabf6 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
@@ -26,7 +26,7 @@ import io.gearpump.integrationtest.{TestSpecBase, Util}
*/
class StormCompatibilitySpec extends TestSpecBase {
- private lazy val stormClient = new StormClient(cluster.getMastersAddresses)
+ private lazy val stormClient = new StormClient(cluster.getMastersAddresses, restClient)
override def beforeAll(): Unit = {
super.beforeAll()
@@ -74,27 +74,33 @@ class StormCompatibilitySpec extends TestSpecBase {
"Storm over Gearpump" should withStorm {
stormVersion =>
- s"support basic topologies ($stormVersion)" in {
+ s"support basic topologies ($stormVersion)" in {
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("exclamation", stormVersion)
+
// exercise
- val appId = stormClient.submitStormApp(stormJar,
- mainClass = "storm.starter.ExclamationTopology", args = topologyName)
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.ExclamationTopology",
+ args = topologyName,
+ appName = topologyName)
// verify
- val actual = expectAppIsRunning(appId, topologyName)
- Util.retryUntil(restClient.queryStreamingAppDetail(actual.appId).clock > 0)
+ Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
}
s"support to run a python version of wordcount ($stormVersion)" in {
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("wordcount", stormVersion)
+
// exercise
- val appId = stormClient.submitStormApp(stormJar,
- mainClass = "storm.starter.WordCountTopology", args = topologyName)
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.WordCountTopology",
+ args = topologyName,
+ appName = topologyName)
// verify
- expectAppIsRunning(appId, topologyName)
Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
}
@@ -104,8 +110,11 @@ class StormCompatibilitySpec extends TestSpecBase {
// input (user and follower) data are already prepared in memory
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("reach", stormVersion)
- stormClient.submitStormApp(stormJar,
- mainClass = "storm.starter.ReachTopology", args = topologyName)
+ stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.ReachTopology",
+ args = topologyName,
+ appName = topologyName)
val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
// verify
@@ -119,13 +128,16 @@ class StormCompatibilitySpec extends TestSpecBase {
s"support tick tuple ($stormVersion)" in {
val stormJar = getStormJar(stormVersion)
val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
+
// exercise
- val appId = stormClient.submitStormApp(stormJar,
- mainClass = "storm.starter.RollingTopWords", args = s"$topologyName remote")
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.RollingTopWords",
+ args = s"$topologyName remote",
+ appName = topologyName)
// verify
- val actual = expectAppIsRunning(appId, topologyName)
- Util.retryUntil(restClient.queryStreamingAppDetail(actual.appId).clock > 0)
+ Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
}
s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in {
@@ -142,8 +154,6 @@ class StormCompatibilitySpec extends TestSpecBase {
val brokerList = kafkaCluster.getBrokerListConnectString
val sourceTopic = "topic1"
val sinkTopic = "topic2"
- val appsCount = restClient.listApps().length
- val appId = appsCount + 1
val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic,
"-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList,
@@ -155,11 +165,12 @@ class StormCompatibilitySpec extends TestSpecBase {
// generate number sequence (1, 2, 3, ...) to the topic
withDataProducer(sourceTopic, brokerList) { producer =>
- stormClient.submitStormApp(stormJar,
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
mainClass = stormKafkaTopology,
- args = args.mkString(" "))
+ args = args.mkString(" "),
+ appName = topologyName)
- expectAppIsRunning(appId, topologyName)
Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
// kill executor and verify at-least-once is guaranteed on application restart
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
index 944f542..c7a2c3e 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
@@ -32,8 +32,8 @@ class StandaloneModeSuite extends Suites(
new RestServiceSpec,
new ExampleSpec,
new DynamicDagSpec,
- new StabilitySpec/*,
- new StormCompatibilitySpec*/
+ new StabilitySpec,
+ new StormCompatibilitySpec
) with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
index 15acf4e..a018280 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
@@ -18,46 +18,37 @@
package io.gearpump.integrationtest.storm
-
import backtype.storm.utils.{Utils, DRPCClient}
-import io.gearpump.integrationtest.Docker
-import io.gearpump.integrationtest.minicluster.BaseContainer
-import org.apache.log4j.Logger
-
+import io.gearpump.integrationtest.{Util, Docker}
+import io.gearpump.integrationtest.minicluster.{RestClient, BaseContainer}
-class StormClient(masterAddrs: Seq[(String, Int)]) {
+class StormClient(masterAddrs: Seq[(String, Int)], restClient: RestClient) {
- private val LOG = Logger.getLogger(getClass)
- private val STORM_HOST = "storm0"
- private val STORM_NIMBUS = "/opt/start storm nimbus"
- private val STORM_APP = "/opt/start storm app"
- private val STORM_DRPC = "storm-drpc"
- private val CONFIG_FILE = "storm.yaml"
+ private val CONFIG_FILE = "/opt/gearpump/storm.yaml"
+ private val DRPC_HOST = "storm0"
private val DRPC_PORT = 3772
private val DRPC_INVOCATIONS_PORT = 3773
+ private val STORM_DRPC = "storm-drpc"
+ private val NIMBUS_HOST = "storm1"
+ private val STORM_NIMBUS = "storm nimbus"
+ private val STORM_APP = "/opt/start storm app"
- private val container = new BaseContainer(STORM_HOST, STORM_DRPC, masterAddrs,
+ private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
+ private val nimbusContainer = new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
def start(): Unit = {
- container.createAndStart()
- startNimbus
- }
-
- private def startNimbus: String = {
- Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE")
+ drpcContainer.createAndStart()
+ nimbusContainer.createAndStart()
}
- def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = {
- try {
- Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config $CONFIG_FILE " +
- s"-jar $jar $mainClass $args").split("\n")
- .filter(_.contains("The application id is ")).head.split(" ").last.toInt
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- -1
- }
+ def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
+ Util.retryUntil({
+ Docker.exec(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
+ s"-jar $jar $mainClass $args")
+ restClient.listRunningApps().exists(_.appName == appName)
+ })
+ restClient.listRunningApps().filter(_.appName == appName).head.appId
}
def getDRPCClient(drpcServerIp: String): DRPCClient = {
@@ -66,7 +57,8 @@ class StormClient(masterAddrs: Seq[(String, Int)]) {
}
def shutDown(): Unit = {
- container.killAndRemove()
+ drpcContainer.killAndRemove()
+ nimbusContainer.killAndRemove()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 830e7c2..3f91f1e 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -385,8 +385,7 @@ object Build extends sbt.Build {
"commons-httpclient" % "commons-httpclient" % commonsHttpVersion,
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % clouderaVersion,
"org.apache.hadoop" % "hadoop-yarn-server-resourcemanager" % clouderaVersion % "provided",
- "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % clouderaVersion % "provided",
- "org.specs2" %% "specs2-mock" % "3.6.4" % "test"
+ "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % clouderaVersion % "provided"
)
)
) dependsOn(services % "test->test;compile->compile", core % "provided")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
index 578a15d..67fd592 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
@@ -348,8 +348,6 @@ object ClockService {
def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
taskClocks(taskIndex) = clock
_min = Longs.min(taskClocks: _*)
-
- Console.print(s"min clock of $processorId is ${_min}")
}
}