You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:04 UTC
[02/50] incubator-gearpump git commit: fix #1895,
make GearpumpNimbus a standalone service
fix #1895, make GearpumpNimbus a standalone service
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/502dbae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/502dbae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/502dbae9
Branch: refs/heads/master
Commit: 502dbae996440cfde7b2b9473a44af9cd1714cc8
Parents: 20c62e5
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jan 29 16:55:53 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Feb 1 16:14:27 2016 +0800
----------------------------------------------------------------------
docs/dev-storm.md | 40 ++-
.../gearpump/experiments/storm/Commands.scala | 36 ---
.../storm/GearpumpThriftServer.scala | 143 -----------
.../experiments/storm/StormRunner.scala | 137 ++---------
.../experiments/storm/main/GearpumpNimbus.scala | 245 +++++++++++++++++++
.../storm/main/GearpumpStormClient.scala | 69 ++++++
.../storm/topology/GearpumpStormTopology.scala | 52 +---
.../experiments/storm/GearpumpNimbusSpec.scala | 68 -----
.../storm/GearpumpThriftServerSpec.scala | 48 ----
.../experiments/storm/StormRunnerSpec.scala | 58 -----
.../topology/GearpumpStormTopologySpec.scala | 20 +-
.../integrationtest/storm/StormClient.scala | 14 +-
project/Pack.scala | 2 +-
.../io/gearpump/services/MasterService.scala | 2 +-
14 files changed, 397 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/docs/dev-storm.md
----------------------------------------------------------------------
diff --git a/docs/dev-storm.md b/docs/dev-storm.md
index a05c4ed..a22ed6c 100644
--- a/docs/dev-storm.md
+++ b/docs/dev-storm.md
@@ -77,23 +77,43 @@ This section shows how to run an existing Storm jar in a local Gearpump cluster.
1. launch a local cluster
```
- ./target/pack/bin/local
+ bin/local
```
-2. submit a topology from storm-starter.
+2. start a local Gearpump Nimbus server
+
+ Users need server's thrift port to submit topologies later. The thrift port is written to a yaml config file set with `-output` option.
+ Users can provide an existing config file where only `nimbus.thrift.port` is overwritten. If not provided, a new file `app.yaml` is created with the config.
+
```
- bin/storm -verbose -config storm.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation
+ bin/storm nimbus -output [conf <custom yaml config>]
```
+
+3. submit Storm applications
+
+ Users can either submit Storm applications through command line or UI.
+
+ a. submit Storm applications through command line
+
+
+ ```
+ bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation
+ ```
- Users are able to configure their applications through following options
+ Users are able to configure their applications through following options
- * `jar` - set the path of a storm application jar
- * `config` - submit a customized storm configuration file
+ * `jar` - set the path of a Storm application jar
+ * `config` - submit the custom configuration file generated when launching Nimbus
- That's it. Check the dashboard and you should see data flowing through your topology.
+ b. submit Storm application through UI
- *Note that submission from UI is not supported yet*.
+ 1. Click on the "Create" button on the applications page on UI.
+ 2. Click on the "Submit Storm Application" item in the pull down menu.
+ 3. In the popup console, upload the Storm application jar and the configuration file generated when launching Nimbus,
+ and fill in `storm.starter.ExclamationTopology exclamation` as arguments.
+ 4. Click on the "Submit" button
+ Either way, check the dashboard and you should see data flowing through your topology.
## How is it different from running on Storm
@@ -145,11 +165,11 @@ Gearpump has flow control between tasks such that [sender cannot flood receiver]
All Storm configurations are respected with the following priority order
```
-defaults.yaml < storm.yaml < application config < component config < custom user config
+defaults.yaml < custom file config < application config < component config
```
where
* application config is submit from Storm application along with the topology
* component config is set in spout / bolt with `getComponentConfiguration`
-* custom user config is specified with the `-config` option when submitting Storm application from command line
\ No newline at end of file
+* custom file config is specified with the `-config` option when submitting Storm application from command line or uploaded from UI
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
deleted file mode 100644
index 1d5b903..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import backtype.storm.generated.{KillOptions, StormTopology, SubmitOptions}
-
-object Commands {
-
- case class Submit(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions)
-
- case class AppSubmitted(name: String, appId: Int)
-
- case class Kill(name: String, option: KillOptions)
-
- case class AppKilled(name: String, appId: Int)
-
- case object GetClusterInfo
-
- case class GetTopology(id: String)
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
deleted file mode 100644
index 640c20c..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-import akka.actor.ActorRef
-import backtype.storm.Config
-import backtype.storm.generated._
-import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
-import backtype.storm.utils.Utils
-import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _}
-import io.gearpump.experiments.storm.util.StormUtil
-import io.gearpump.util.ActorUtil.askActor
-
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-object GearpumpThriftServer {
- val THRIFT_PORT = StormUtil.getThriftPort
- implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-
- private def createServer(handler: ActorRef): ThriftServer = {
- val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(handler))
- val connectionType = ThriftConnectionType.NIMBUS
- val config = Utils.readDefaultConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
- config.put(Config.NIMBUS_THRIFT_PORT, s"$THRIFT_PORT")
- new ThriftServer(config, processor, connectionType)
- }
-
- def apply(handler: ActorRef): GearpumpThriftServer = {
- new GearpumpThriftServer(createServer(handler))
- }
-
- class GearpumpNimbus(handler: ActorRef) extends Nimbus.Iface {
-
- override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = {
- val ask = askActor(handler, Submit(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE)))
- Await.result(ask, 30 seconds)
- }
-
- override def killTopologyWithOpts(name: String, options: KillOptions): Unit = {
- Await.result(askActor(handler,Kill(name, options)), 10 seconds)
- }
-
- override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
- Await.result(askActor(handler,Submit(name, uploadedJarLocation, jsonConf, topology, options)), 10 seconds)
- }
-
- override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
- }
-
- override def getNimbusConf: String = {
- throw new UnsupportedOperationException
- }
-
- override def getTopology(id: String): StormTopology = {
- Await.result(askActor[StormTopology](handler, GetTopology(id)), 10 seconds)
- }
-
- override def getTopologyConf(id: String): String = {
- throw new UnsupportedOperationException
- }
-
- override def beginFileDownload(file: String): String = {
- throw new UnsupportedOperationException
- }
-
- override def getUserTopology(id: String): StormTopology = getTopology(id)
-
- override def activate(name: String): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def rebalance(name: String, options: RebalanceOptions): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def deactivate(name: String): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def getTopologyInfo(id: String): TopologyInfo = {
- throw new UnsupportedOperationException
- }
-
- override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = {
- throw new UnsupportedOperationException
- }
-
- override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions())
-
- override def downloadChunk(id: String): ByteBuffer = {
- throw new UnsupportedOperationException
- }
-
- override def beginFileUpload(): String = {
- "local thrift server"
- }
-
- override def getClusterInfo: ClusterSummary = {
- Await.result(askActor[ClusterSummary](handler, GetClusterInfo), 10 seconds)
- }
-
- override def finishFileUpload(location: String): Unit = {
- }
-
- override def uploadNewCredentials(s: String, credentials: Credentials): Unit = {
- throw new UnsupportedOperationException
- }
- }
-}
-
-class GearpumpThriftServer(server: ThriftServer) extends Thread {
-
- override def run(): Unit = {
- server.serve()
- }
-
- def close(): Unit = {
- server.stop()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
index 84d9460..dd8d782 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
@@ -18,131 +18,36 @@
package io.gearpump.experiments.storm
-import java.io.File
-import java.util.{Map => JMap}
+import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
+import io.gearpump.util.LogUtil
+import org.slf4j.Logger
-import akka.actor.{Actor, ActorSystem, Props}
-import backtype.storm.Config
-import backtype.storm.generated.{ClusterSummary, StormTopology, SupervisorSummary, TopologySummary}
-import com.typesafe.config.ConfigValueFactory
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _}
-import io.gearpump.experiments.storm.topology.GearpumpStormTopology
-import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants}
-import io.gearpump.streaming.StreamApplication
-import io.gearpump.util.Constants._
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+object StormRunner {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
-import scala.collection.JavaConverters._
+ private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient)
-object StormRunner extends AkkaApp with ArgumentsParser {
- override val options: Array[(String, CLIOption[Any])] = Array(
- "jar" -> CLIOption[String]("<storm jar>", required = true),
- "config" -> CLIOption[String]("<storm config path>", required = false),
- "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)))
-
- override val remainArgs = Array("topology_name")
-
- override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-
- val akkaConf = updateConfig(inputAkkaConf)
- val config = parse(args)
-
- val verbose = config.getBoolean("verbose")
- if (verbose) {
- LogUtil.verboseLogToConsole
- }
-
- val jar = config.getString("jar")
- val stormConfig = config.getString("config")
- val topology = config.remainArgs(0)
- val stormArgs = config.remainArgs.drop(1)
-
- val system = ActorSystem("storm", akkaConf)
- val clientContext = new ClientContext(akkaConf, system, null)
-
- val gearpumpNimbus = system.actorOf(Props(new Handler(clientContext, jar, stormConfig)))
- val thriftServer = GearpumpThriftServer(gearpumpNimbus)
- thriftServer.start()
-
- val stormOptions = Array("-Dstorm.options=" +
- s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=${GearpumpThriftServer.THRIFT_PORT}",
- "-Dstorm.jar=" + jar,
- s"-D${PREFER_IPV4}=true"
- )
-
- val classPath = Array(System.getProperty("java.class.path"), jar)
- val process = Util.startProcess(stormOptions, classPath, topology, stormArgs)
-
- // wait till the process exit
- val exit = process.exitValue()
-
- thriftServer.close()
- clientContext.close()
- system.shutdown()
- system.awaitTermination()
-
- if (exit != 0) {
- throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}")
- }
+ private def usage: Unit = {
+ println(commands)
+ val keys = commands.keys.toList.sorted
+ Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
}
- import Constants._
- private def updateConfig(config: Config): Config = {
- val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
- val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
- val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
-
- val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath))
- .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath))
-
- if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
- val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
- updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
+ private def executeCommand(command : String, commandArgs : Array[String]): Unit = {
+ if (!commands.contains(command)) {
+ usage
} else {
- updated
+ commands(command).main(commandArgs)
}
}
-
- class Handler(clientContext: ClientContext, jar: String, fileConfig: String) extends Actor {
- private var applications = Map.empty[String, Int]
- private var topologies = Map.empty[String, StormTopology]
- private val LOG = LogUtil.getLogger(classOf[Handler])
-
- implicit val system = context.system
-
- def receive: Receive = {
- case Kill(name, option) =>
- topologies -= name
- clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found")))
- val appId = applications(name)
- applications -= name
- LOG.info(s"Killed topology $name")
- sender ! AppKilled(name, appId)
- case Submit(name, uploadedJarLocation, appConfig, topology, option) =>
- topologies += name -> topology
-
- val gearpumpStormTopology = GearpumpStormTopology(topology, appConfig, fileConfig)
- val stormConfig = gearpumpStormTopology.getStormConfig
- val processorGraph = GraphBuilder.build(gearpumpStormTopology)
- val config = UserConfig.empty
- .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
- .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
- val app = StreamApplication(name, processorGraph, config)
- val appId = clientContext.submit(app, jar)
- applications += name -> appId
- LOG.info(s"Storm Application $appId submitted")
- sender ! AppSubmitted(name, appId)
- case GetClusterInfo =>
- val topologySummaryList = topologies.map { case (name, _) =>
- new TopologySummary(name, name, 0, 0, 0, 0, "")
- }.toSeq
- sender ! new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
- case GetTopology(id) =>
- sender ! topologies(id)
+ def main(args: Array[String]) = {
+ if (args.length == 0) {
+ usage
+ } else {
+ val command = args(0)
+ val commandArgs = args.drop(1)
+ executeCommand(command, commandArgs)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
new file mode 100644
index 0000000..4204580
--- /dev/null
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm.main
+
+import java.io.{File, FileOutputStream, FileWriter}
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, WritableByteChannel}
+import java.util.{Map => JMap, UUID}
+
+import akka.actor.ActorSystem
+import backtype.storm.Config
+import backtype.storm.generated._
+import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
+import backtype.storm.utils.TimeCacheMap.ExpiredCallback
+import backtype.storm.utils.{TimeCacheMap, Utils}
+import com.typesafe.config.ConfigValueFactory
+import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.experiments.storm.topology.GearpumpStormTopology
+import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil}
+import io.gearpump.streaming.StreamApplication
+import io.gearpump.util.{FileUtils, AkkaApp, Constants, LogUtil}
+import org.apache.storm.shade.org.json.simple.JSONValue
+import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
+import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+
+object GearpumpNimbus extends AkkaApp with ArgumentsParser {
+ private val THRIFT_PORT = StormUtil.getThriftPort
+ private val OUTPUT = "output"
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml"))
+ )
+
+ override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+ val parsed = parse(args)
+ val output = parsed.getString(OUTPUT)
+ val akkaConf = updateClientConfig(inputAkkaConf)
+ val system = ActorSystem("storm", akkaConf)
+ val clientContext = new ClientContext(akkaConf, system, null)
+ val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
+ val thriftConf: JMap[String, String] = Map(Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava
+ updateOutputStormConfig(thriftConf, output)
+ stormConf.putAll(thriftConf)
+ val thriftServer = createServer(clientContext, stormConf)
+ thriftServer.serve()
+ system.awaitTermination()
+ }
+
+ private def createServer(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = {
+ val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, stormConf))
+ val connectionType = ThriftConnectionType.NIMBUS
+ new ThriftServer(stormConf, processor, connectionType)
+ }
+
+ private def updateOutputStormConfig(conf: JMap[String, String], output: String): Unit = {
+ // read existing config
+ val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]]
+ outputConfig.putAll(conf)
+ val yaml = new Yaml(new SafeConstructor)
+ val writer = new FileWriter(new File(output))
+ yaml.dump(outputConfig, writer)
+ }
+
+ import Constants._
+ private def updateClientConfig(config: Config): Config = {
+ val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
+ val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
+ val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
+
+ val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath))
+ .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath))
+
+ if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
+ val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
+ updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
+ } else {
+ updated
+ }
+ }
+
+}
+
+class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface {
+ private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
+ private var applications = Map.empty[String, Int]
+ private var topologies = Map.empty[String, (StormTopology, JMap[AnyRef, AnyRef])]
+ private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
+ private val expiredCallback = new ExpiredCallback[String, WritableByteChannel] {
+ override def expire(k: String, v: WritableByteChannel): Unit = {
+ v.close()
+ }
+ }
+ private val fileCacheMap = new TimeCacheMap[String, WritableByteChannel](expireSeconds, expiredCallback)
+
+ override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = {
+ submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE))
+ }
+
+ override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
+
+ implicit val system = clientContext.system
+ val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
+ val stormConfig = gearpumpStormTopology.getStormConfig
+ val processorGraph = GraphBuilder.build(gearpumpStormTopology)
+ val config = UserConfig.empty
+ .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
+ .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
+ val app = StreamApplication(name, processorGraph, config)
+ LOG.info(s"jar file uploaded to $uploadedJarLocation")
+ val appId = clientContext.submit(app, uploadedJarLocation)
+ applications += name -> appId
+ topologies += name -> (topology, stormConfig)
+ LOG.info(s"Storm Application $appId submitted")
+ }
+
+ override def killTopologyWithOpts(name: String, options: KillOptions): Unit = {
+ clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found")))
+ applications -= name
+ topologies -= name
+ LOG.info(s"Killed topology $name")
+ }
+
+ override def getNimbusConf: String = {
+ JSONValue.toJSONString(stormConf)
+ }
+
+ override def getTopology(name: String): StormTopology = {
+ updateApps
+ topologies.getOrElse(name,
+ throw new RuntimeException(s"topology $name not found"))._1
+ }
+
+ override def getTopologyConf(name: String): String = {
+ updateApps
+ JSONValue.toJSONString(topologies.getOrElse(name,
+ throw new RuntimeException(s"topology $name not found"))._2)
+ }
+
+ override def getUserTopology(id: String): StormTopology = getTopology(id)
+
+ override def beginFileUpload(): String = {
+ val location = s"stormjar-${UUID.randomUUID()}.jar"
+ val channel = Channels.newChannel(new FileOutputStream(location))
+ fileCacheMap.put(location, channel)
+ LOG.info(s"Uploading file from client to $location")
+ location
+ }
+
+ override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
+ if (!fileCacheMap.containsKey(location)) {
+ throw new RuntimeException(s"File for $location does not exist (or timed out)")
+ } else {
+ val channel = fileCacheMap.get(location)
+ channel.write(chunk)
+ fileCacheMap.put(location, channel)
+ }
+ }
+
+ override def finishFileUpload(location: String): Unit = {
+ if (!fileCacheMap.containsKey(location)) {
+ throw new RuntimeException(s"File for $location does not exist (or timed out)")
+ } else {
+ val channel = fileCacheMap.get(location)
+ channel.close()
+ fileCacheMap.remove(location)
+ new File(location).delete()
+ }
+ }
+
+ override def getClusterInfo: ClusterSummary = {
+ updateApps
+ val topologySummaryList = topologies.map { case (name, _) =>
+ new TopologySummary(name, name, 0, 0, 0, 0, "")
+ }.toSeq
+ new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
+ }
+
+ override def beginFileDownload(file: String): String = {
+ throw new UnsupportedOperationException
+ }
+
+ override def uploadNewCredentials(s: String, credentials: Credentials): Unit = {
+ throw new UnsupportedOperationException
+ }
+ override def activate(name: String): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def rebalance(name: String, options: RebalanceOptions): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def deactivate(name: String): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def getTopologyInfo(name: String): TopologyInfo = {
+ throw new UnsupportedOperationException
+ }
+
+ override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = {
+ throw new UnsupportedOperationException
+ }
+
+ override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions())
+
+ override def downloadChunk(name: String): ByteBuffer = {
+ throw new UnsupportedOperationException
+ }
+
+ private def updateApps: Unit = {
+ clientContext.listApps.appMasters.foreach { app =>
+ val name = app.appName
+ if (applications.contains(app.appName)) {
+ if (app.status != MasterToAppMaster.AppMasterActive) {
+ applications -= name
+ topologies -= name
+ }
+ }
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
new file mode 100644
index 0000000..70efbf3
--- /dev/null
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm.main
+
+import backtype.storm.Config
+import backtype.storm.utils.Utils
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.util.Constants._
+import io.gearpump.util.{AkkaApp, LogUtil, Util}
+
+object GearpumpStormClient extends AkkaApp with ArgumentsParser {
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "jar" -> CLIOption[String]("<storm jar>", required = true),
+ "config" -> CLIOption[Int]("<storm config file>", required = true),
+ "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)))
+
+ override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+
+ val verbose = config.getBoolean("verbose")
+ if (verbose) {
+ LogUtil.verboseLogToConsole
+ }
+
+ val jar = config.getString("jar")
+ val stormConfig = config.getString("config")
+ val topology = config.remainArgs(0)
+ val stormArgs = config.remainArgs.drop(1)
+ val stormOptions = Array(
+ s"-Dstorm.options=${getThriftOptions(stormConfig)}",
+ s"-Dstorm.jar=$jar",
+ s"-Dstorm.config.file=$stormConfig",
+ s"-D${PREFER_IPV4}=true"
+ )
+
+ val classPath = Array(s"${System.getProperty(GEARPUMP_HOME)}/lib/storm/*", jar)
+ val process = Util.startProcess(stormOptions, classPath, topology, stormArgs)
+
+ // wait till the process exit
+ val exit = process.exitValue()
+
+ if (exit != 0) {
+ throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}")
+ }
+ }
+
+ private def getThriftOptions(stormConfig: String): String = {
+ val config = Utils.findAndReadConfigFile(stormConfig, true)
+ val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT)
+ s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=$thriftPort"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
index 996498b..b88a200 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
@@ -18,7 +18,6 @@
package io.gearpump.experiments.storm.topology
-import java.io._
import java.lang.{Iterable => JIterable}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
@@ -35,8 +34,6 @@ import io.gearpump.experiments.storm.util.StormUtil._
import io.gearpump.streaming.Processor
import io.gearpump.streaming.task.Task
import io.gearpump.util.LogUtil
-import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
-import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
import org.slf4j.Logger
import scala.collection.JavaConversions._
@@ -45,43 +42,18 @@ object GearpumpStormTopology {
private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology])
def apply(
+ name: String,
topology: StormTopology,
- appConfigInJson: String,
- fileConfig: String)(implicit system: ActorSystem): GearpumpStormTopology = {
+ appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = {
new GearpumpStormTopology(
+ name,
topology,
Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]],
- parseJsonStringToMap(appConfigInJson),
- readStormConfig(fileConfig)
+ parseJsonStringToMap(appConfigInJson)
)
}
- /**
- * @param configFile user provided local config file
- * @return a config Map loaded from config file
- */
- private def readStormConfig(configFile: String): JMap[AnyRef, AnyRef] = {
- var ret: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
- try {
- val yaml = new Yaml(new SafeConstructor)
- val input: InputStream = new FileInputStream(configFile)
- try {
- ret = yaml.load(new InputStreamReader(input)).asInstanceOf[JMap[AnyRef, AnyRef]]
- } catch {
- case e: IOException =>
- LOG.warn(s"failed to load config file $configFile")
- } finally {
- input.close()
- }
- } catch {
- case e: FileNotFoundException =>
- LOG.warn(s"failed to find config file $configFile")
- case t: Throwable =>
- LOG.error(t.getMessage)
- }
- ret
- }
}
/**
@@ -91,21 +63,20 @@ object GearpumpStormTopology {
* 3. provides interface for Gearpump applications to use Storm topology
*
* an implicit ActorSystem is required to create Gearpump processors
+ * @param name topology name
* @param topology Storm topology
- * @param sysConfig configs from "defaults.yaml" and "storm.yaml"
+ * @param sysConfig configs from "defaults.yaml" and custom config file
* @param appConfig config submitted from user application
- * @param fileConfig custom file config set by user in command line
*/
private[storm] class GearpumpStormTopology(
+ name: String,
topology: StormTopology,
sysConfig: JMap[AnyRef, AnyRef],
- appConfig: JMap[AnyRef, AnyRef],
- fileConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
- import io.gearpump.experiments.storm.topology.GearpumpStormTopology._
+ appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
private val spouts = topology.get_spouts()
private val bolts = topology.get_bolts()
- private val stormConfig = mergeConfigs(sysConfig, appConfig, fileConfig, getComponentConfigs(spouts, bolts))
+ private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts))
private val spoutProcessors = spouts.map { case (id, spout) =>
id -> spoutToProcessor(id, spout, stormConfig.toMap) }.toMap
private val boltProcessors = bolts.map { case (id, bolt) =>
@@ -114,7 +85,7 @@ private[storm] class GearpumpStormTopology(
/**
* @return merged Storm config with priority
- * defaults.yaml < storm.yaml < application config < component config < custom file config
+ * defaults.yaml < custom file config < application config < component config
*/
def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig
@@ -139,13 +110,12 @@ private[storm] class GearpumpStormTopology(
private def mergeConfigs(
sysConfig: JMap[AnyRef, AnyRef],
appConfig: JMap[AnyRef, AnyRef],
- fileConfig: JMap[AnyRef, AnyRef],
componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = {
val allConfig = new JHashMap[AnyRef, AnyRef]
allConfig.putAll(sysConfig)
allConfig.putAll(appConfig)
allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap))
- allConfig.putAll(fileConfig)
+ allConfig.put(Config.TOPOLOGY_NAME, name)
allConfig
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
deleted file mode 100644
index 20b63b3..0000000
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import backtype.storm.generated.StormTopology
-import io.gearpump.experiments.storm.util.TopologyUtil
-import io.gearpump.cluster.TestUtil
-import Commands.{GetTopology, Kill, Submit}
-import GearpumpThriftServer.GearpumpNimbus
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.concurrent.Future
-
-class GearpumpNimbusSpec extends WordSpec with Matchers with MockitoSugar {
-
- "GearpumpNimbus" should {
- "submit and kill topology through ClientContext" in {
-
- implicit val system = ActorSystem("storm-test", TestUtil.DEFAULT_CONFIG)
- implicit val dispatcher = system.dispatcher
-
- val handler = TestProbe()
- val gearpumpNimbus = new GearpumpNimbus(handler.ref)
-
- val appId = 0
- val name = "test"
- val uploadedJarLocation = "local"
- val jsonConf = "storm_json_conf"
- val topology = TopologyUtil.getTestTopology
-
- Future(gearpumpNimbus.submitTopology(name, uploadedJarLocation, jsonConf, topology))
- handler.expectMsgType[Submit]
-
- Future(gearpumpNimbus.getTopology(name))
- handler.expectMsgType[GetTopology]
- handler.reply(new StormTopology)
-
- Future(gearpumpNimbus.getUserTopology(name))
- handler.expectMsgType[GetTopology]
- handler.reply(new StormTopology)
-
- Future(gearpumpNimbus.killTopology(name))
- handler.expectMsgType[Kill]
-
- system.shutdown()
- system.awaitTermination()
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
deleted file mode 100644
index b1736f0..0000000
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import backtype.storm.security.auth.ThriftServer
-import org.mockito.Mockito._
-import org.scalatest.WordSpec
-import org.scalatest.mock.MockitoSugar
-
-class GearpumpThriftServerSpec extends WordSpec with MockitoSugar {
-
- "GearpumpThriftServer" should {
- "run ThriftServer.serve" in {
- val tServer = mock[ThriftServer]
- val thriftServer = new GearpumpThriftServer(tServer)
-
- thriftServer.run()
-
- verify(tServer).serve()
- }
-
- "stop ThriftServer" in {
- val tServer = mock[ThriftServer]
- val thriftServer = new GearpumpThriftServer(tServer)
-
- thriftServer.close()
-
- verify(tServer).stop()
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
deleted file mode 100644
index 95cf279..0000000
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package io.gearpump.experiments.storm
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.experiments.storm.Commands.{AppKilled, AppSubmitted, Kill, Submit}
-import io.gearpump.experiments.storm.StormRunner.Handler
-import io.gearpump.experiments.storm.util.TopologyUtil
-import org.mockito.Matchers._
-import org.mockito.Mockito
-import org.scalatest.{FlatSpec, Matchers}
-
-class StormRunnerSpec extends FlatSpec with Matchers {
- it should "handle submit/kill correctly" in {
- val conf = TestUtil.DEFAULT_CONFIG
- implicit val system = ActorSystem("storm-test", conf)
-
- val uploadedJarLocation = "local"
- val jsonConf = "storm_json_conf"
- val topology = TopologyUtil.getTestTopology
-
- implicit val dispatcher = system.dispatcher
- val clientContext = Mockito.mock(classOf[ClientContext])
- Mockito.when(clientContext.submit(any(), any())).thenReturn(0)
- val handler = system.actorOf(Props(new Handler(clientContext, "jar", "user_config")))
- val client = TestProbe()
-
- client.send(handler, Submit("app", uploadedJarLocation, jsonConf, topology, null))
- client.expectMsg(AppSubmitted("app", 0))
-
-
- client.send(handler, Kill("app", null))
- client.expectMsg(AppKilled("app", 0))
-
- system.shutdown()
- system.awaitTermination()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index d1c330b..8f10886 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.experiments.storm.topology
import java.util.{HashMap => JHashMap, Map => JMap}
+import backtype.storm.Config
import io.gearpump.experiments.storm.processor.StormProcessor
import io.gearpump.experiments.storm.producer.StormProducer
import io.gearpump.experiments.storm.util.TopologyUtil
@@ -40,28 +41,27 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
val sysConfig = newJavaConfig(name, sysVal)
val appVal = "app"
val appConfig = newJavaConfig(name, appVal)
- val fileVal = "file"
- val fileConfig = newJavaConfig(name, fileVal)
implicit val system = MockUtil.system
- val topology1 = new GearpumpStormTopology(stormTopology, newEmptyConfig, newEmptyConfig, newEmptyConfig)
- topology1.getStormConfig shouldBe empty
+ val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, newEmptyConfig)
+ topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1"
+ topology1.getStormConfig should not contain name
- val topology2 = new GearpumpStormTopology(stormTopology, sysConfig, newEmptyConfig, newEmptyConfig)
+ val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, newEmptyConfig)
+ topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2"
topology2.getStormConfig.get(name) shouldBe sysVal
- val topology3 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, newEmptyConfig)
+ val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig)
+ topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3"
topology3.getStormConfig.get(name) shouldBe appVal
- val topology4 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, fileConfig)
- topology4.getStormConfig.get(name) shouldBe fileVal
}
"create Gearpump processors from Storm topology" in {
val stormTopology = TopologyUtil.getTestTopology
implicit val system = MockUtil.system
val gearpumpStormTopology =
- GearpumpStormTopology(stormTopology, null, "")
+ GearpumpStormTopology("app", stormTopology, null)
val processors = gearpumpStormTopology.getProcessors
stormTopology.get_spouts().foreach { case (spoutId, _) =>
val processor = processors(spoutId)
@@ -80,7 +80,7 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
implicit val system = MockUtil.system
val sysConfig = new JHashMap[AnyRef, AnyRef]
val gearpumpStormTopology =
- GearpumpStormTopology(stormTopology, null, "")
+ GearpumpStormTopology("app", stormTopology, null)
val targets0 = gearpumpStormTopology.getTargets("1")
targets0 should contain key "3"
targets0 should contain key "4"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
index ed792f4..15acf4e 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
@@ -20,7 +20,6 @@ package io.gearpump.integrationtest.storm
import backtype.storm.utils.{Utils, DRPCClient}
-import io.gearpump.experiments.storm.GearpumpThriftServer
import io.gearpump.integrationtest.Docker
import io.gearpump.integrationtest.minicluster.BaseContainer
import org.apache.log4j.Logger
@@ -30,23 +29,28 @@ class StormClient(masterAddrs: Seq[(String, Int)]) {
private val LOG = Logger.getLogger(getClass)
private val STORM_HOST = "storm0"
- private val STORM_CMD = "/opt/start storm"
+ private val STORM_NIMBUS = "/opt/start storm nimbus"
+ private val STORM_APP = "/opt/start storm app"
private val STORM_DRPC = "storm-drpc"
private val CONFIG_FILE = "storm.yaml"
- private val NIMBUS_THRIFT_PORT = GearpumpThriftServer.THRIFT_PORT
private val DRPC_PORT = 3772
private val DRPC_INVOCATIONS_PORT = 3773
private val container = new BaseContainer(STORM_HOST, STORM_DRPC, masterAddrs,
- tunnelPorts = Set(NIMBUS_THRIFT_PORT, DRPC_PORT, DRPC_INVOCATIONS_PORT))
+ tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
def start(): Unit = {
container.createAndStart()
+ startNimbus
+ }
+
+ private def startNimbus: String = {
+ Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE")
}
def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = {
try {
- Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_CMD -config $CONFIG_FILE " +
+ Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config $CONFIG_FILE " +
s"-jar $jar $mainClass $args").split("\n")
.filter(_.contains("The application id is ")).head.split(" ").last.toInt
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index b242b36..a3852c4 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -59,7 +59,7 @@ object Pack extends sbt.Build {
"worker" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-DlogFilename=worker", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
"services" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
"yarnclient" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
- "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost")
+ "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}")
),
packLibDir := Map(
"lib" -> new ProjectsToPack(core.id, streaming.id),
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index e2b2c47..aa90726 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -221,7 +221,7 @@ object MasterService {
*/
def submitStormApp(jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = {
submitAndDeleteTempFiles(
- "io.gearpump.experiments.storm.StormRunner",
+ "io.gearpump.experiments.storm.main.GearpumpStormClient",
argsArray = spaceSeparatedArgumentsToArray(args),
fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get),
classPath = getStormApplicationClassPath,