You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:04 UTC

[02/50] incubator-gearpump git commit: fix #1895, make GearpumpNimbus a standalone service

fix #1895, make GearpumpNimbus a standalone service


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/502dbae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/502dbae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/502dbae9

Branch: refs/heads/master
Commit: 502dbae996440cfde7b2b9473a44af9cd1714cc8
Parents: 20c62e5
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jan 29 16:55:53 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Feb 1 16:14:27 2016 +0800

----------------------------------------------------------------------
 docs/dev-storm.md                               |  40 ++-
 .../gearpump/experiments/storm/Commands.scala   |  36 ---
 .../storm/GearpumpThriftServer.scala            | 143 -----------
 .../experiments/storm/StormRunner.scala         | 137 ++---------
 .../experiments/storm/main/GearpumpNimbus.scala | 245 +++++++++++++++++++
 .../storm/main/GearpumpStormClient.scala        |  69 ++++++
 .../storm/topology/GearpumpStormTopology.scala  |  52 +---
 .../experiments/storm/GearpumpNimbusSpec.scala  |  68 -----
 .../storm/GearpumpThriftServerSpec.scala        |  48 ----
 .../experiments/storm/StormRunnerSpec.scala     |  58 -----
 .../topology/GearpumpStormTopologySpec.scala    |  20 +-
 .../integrationtest/storm/StormClient.scala     |  14 +-
 project/Pack.scala                              |   2 +-
 .../io/gearpump/services/MasterService.scala    |   2 +-
 14 files changed, 397 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/docs/dev-storm.md
----------------------------------------------------------------------
diff --git a/docs/dev-storm.md b/docs/dev-storm.md
index a05c4ed..a22ed6c 100644
--- a/docs/dev-storm.md
+++ b/docs/dev-storm.md
@@ -77,23 +77,43 @@ This section shows how to run an existing Storm jar in a local Gearpump cluster.
 1. launch a local cluster
   
    ```
-   ./target/pack/bin/local
+   bin/local
    ```
 
-2. submit a topology from storm-starter. 
+2. start a local Gearpump Nimbus server 
+
+   Users need server's thrift port to submit topologies later. The thrift port is written to a yaml config file set with `-output` option. 
+   Users can provide an existing config file where only `nimbus.thrift.port` is overwritten. If not provided, a new file `app.yaml` is created with the config.
+
    ```
-   bin/storm -verbose -config storm.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation 
+   bin/storm nimbus -output [conf <custom yaml config>]
    ```
+   
+3. submit Storm applications
+  
+   Users can either submit Storm applications through command line or UI. 
+   
+   a. submit Storm applications through command line
+
+
+     ```
+     bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation 
+     ```
   
-   Users are able to configure their applications through following options
+     Users are able to configure their applications through following options
    
-     * `jar` - set the path of a storm application jar
-     * `config` - submit a customized storm configuration file     
+       * `jar` - set the path of a Storm application jar
+       * `config` - submit the custom configuration file generated when launching Nimbus
   
-   That's it. Check the dashboard and you should see data flowing through your topology. 
+   b. submit Storm application through UI
    
-   *Note that submission from UI is not supported yet*. 
+      1. Click on the "Create" button on the applications page on UI. 
+      2. Click on the "Submit Storm Application" item in the pull down menu.
+      3. In the popup console, upload the Storm application jar and the configuration file generated when launching Nimbus,
+         and fill in `storm.starter.ExclamationTopology exclamation` as arguments.
+      4. Click on the "Submit" button   
 
+   Either way, check the dashboard and you should see data flowing through your topology. 
   
 ## How is it different from running on Storm
 
@@ -145,11 +165,11 @@ Gearpump has flow control between tasks such that [sender cannot flood receiver]
 All Storm configurations are respected with the following priority order 
 
 ```
-defaults.yaml < storm.yaml < application config < component config < custom user config
+defaults.yaml < custom file config < application config < component config
 ```
 
 where
 
 * application config is submit from Storm application along with the topology 
 * component config is set in spout / bolt with `getComponentConfiguration`
-* custom user config is specified with the `-config` option when submitting Storm application from command line
\ No newline at end of file
+* custom file config is specified with the `-config` option when submitting Storm application from command line or uploaded from UI
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
deleted file mode 100644
index 1d5b903..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import backtype.storm.generated.{KillOptions, StormTopology, SubmitOptions}
-
-object Commands {
-
-  case class Submit(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions)
-
-  case class AppSubmitted(name: String, appId: Int)
-
-  case class Kill(name: String, option: KillOptions)
-
-  case class AppKilled(name: String, appId: Int)
-
-  case object GetClusterInfo
-
-  case class GetTopology(id: String)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
deleted file mode 100644
index 640c20c..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-import akka.actor.ActorRef
-import backtype.storm.Config
-import backtype.storm.generated._
-import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
-import backtype.storm.utils.Utils
-import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _}
-import io.gearpump.experiments.storm.util.StormUtil
-import io.gearpump.util.ActorUtil.askActor
-
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-object GearpumpThriftServer {
-  val THRIFT_PORT = StormUtil.getThriftPort
-  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
-
-  private def createServer(handler: ActorRef): ThriftServer = {
-    val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(handler))
-    val connectionType = ThriftConnectionType.NIMBUS
-    val config = Utils.readDefaultConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
-    config.put(Config.NIMBUS_THRIFT_PORT, s"$THRIFT_PORT")
-    new ThriftServer(config, processor, connectionType)
-  }
-
-  def apply(handler: ActorRef): GearpumpThriftServer = {
-    new GearpumpThriftServer(createServer(handler))
-  }
-
-  class GearpumpNimbus(handler: ActorRef) extends Nimbus.Iface {
-
-    override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = {
-      val ask = askActor(handler, Submit(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE)))
-      Await.result(ask, 30 seconds)
-    }
-
-    override def killTopologyWithOpts(name: String, options: KillOptions): Unit = {
-      Await.result(askActor(handler,Kill(name, options)), 10 seconds)
-    }
-
-    override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
-      Await.result(askActor(handler,Submit(name, uploadedJarLocation, jsonConf, topology, options)), 10 seconds)
-    }
-
-    override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
-    }
-
-    override def getNimbusConf: String = {
-      throw new UnsupportedOperationException
-    }
-
-    override def getTopology(id: String): StormTopology = {
-      Await.result(askActor[StormTopology](handler, GetTopology(id)), 10 seconds)
-    }
-
-    override def getTopologyConf(id: String): String = {
-      throw new UnsupportedOperationException
-    }
-
-    override def beginFileDownload(file: String): String = {
-      throw new UnsupportedOperationException
-    }
-
-    override def getUserTopology(id: String): StormTopology = getTopology(id)
-
-    override def activate(name: String): Unit = {
-      throw new UnsupportedOperationException
-    }
-
-    override def rebalance(name: String, options: RebalanceOptions): Unit = {
-      throw new UnsupportedOperationException
-    }
-
-    override def deactivate(name: String): Unit = {
-      throw new UnsupportedOperationException
-    }
-
-    override def getTopologyInfo(id: String): TopologyInfo = {
-      throw new UnsupportedOperationException
-    }
-
-    override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = {
-      throw new UnsupportedOperationException
-    }
-
-    override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions())
-
-    override def downloadChunk(id: String): ByteBuffer = {
-      throw new UnsupportedOperationException
-    }
-
-    override def beginFileUpload(): String = {
-      "local thrift server"
-    }
-
-    override def getClusterInfo: ClusterSummary = {
-      Await.result(askActor[ClusterSummary](handler, GetClusterInfo), 10 seconds)
-    }
-
-    override def finishFileUpload(location: String): Unit = {
-    }
-
-    override def uploadNewCredentials(s: String, credentials: Credentials): Unit = {
-      throw new UnsupportedOperationException
-    }
-  }
-}
-
-class GearpumpThriftServer(server: ThriftServer) extends Thread {
-
-  override def run(): Unit = {
-    server.serve()
-  }
-
-  def close(): Unit = {
-    server.stop()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
index 84d9460..dd8d782 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
@@ -18,131 +18,36 @@
 
 package io.gearpump.experiments.storm
 
-import java.io.File
-import java.util.{Map => JMap}
+import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
+import io.gearpump.util.LogUtil
+import org.slf4j.Logger
 
-import akka.actor.{Actor, ActorSystem, Props}
-import backtype.storm.Config
-import backtype.storm.generated.{ClusterSummary, StormTopology, SupervisorSummary, TopologySummary}
-import com.typesafe.config.ConfigValueFactory
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _}
-import io.gearpump.experiments.storm.topology.GearpumpStormTopology
-import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants}
-import io.gearpump.streaming.StreamApplication
-import io.gearpump.util.Constants._
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+object StormRunner {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
 
-import scala.collection.JavaConverters._
+  private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient)
 
-object StormRunner extends AkkaApp with ArgumentsParser {
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "jar" -> CLIOption[String]("<storm jar>", required = true),
-    "config" -> CLIOption[String]("<storm config path>", required = false),
-    "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)))
-
-  override val remainArgs = Array("topology_name")
-
-  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-
-    val akkaConf = updateConfig(inputAkkaConf)
-    val config = parse(args)
-
-    val verbose = config.getBoolean("verbose")
-    if (verbose) {
-      LogUtil.verboseLogToConsole
-    }
-
-    val jar = config.getString("jar")
-    val stormConfig = config.getString("config")
-    val topology = config.remainArgs(0)
-    val stormArgs = config.remainArgs.drop(1)
-
-    val system = ActorSystem("storm", akkaConf)
-    val clientContext = new ClientContext(akkaConf, system, null)
-
-    val gearpumpNimbus = system.actorOf(Props(new Handler(clientContext, jar, stormConfig)))
-    val thriftServer = GearpumpThriftServer(gearpumpNimbus)
-    thriftServer.start()
-
-    val stormOptions = Array("-Dstorm.options=" +
-      s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=${GearpumpThriftServer.THRIFT_PORT}",
-      "-Dstorm.jar=" + jar,
-      s"-D${PREFER_IPV4}=true"
-    )
-
-    val classPath = Array(System.getProperty("java.class.path"), jar)
-    val process = Util.startProcess(stormOptions, classPath, topology, stormArgs)
-
-    // wait till the process exit
-    val exit = process.exitValue()
-
-    thriftServer.close()
-    clientContext.close()
-    system.shutdown()
-    system.awaitTermination()
-
-    if (exit != 0) {
-      throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}")
-    }
+  private def usage: Unit = {
+    println(commands)
+    val keys = commands.keys.toList.sorted
+    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
   }
 
-  import Constants._
-  private def updateConfig(config: Config): Config = {
-    val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
-    val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
-    val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
-
-    val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath))
-      .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath))
-
-    if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
-      val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
-      updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
+  private def executeCommand(command : String, commandArgs : Array[String]): Unit = {
+    if (!commands.contains(command)) {
+      usage
     } else {
-      updated
+      commands(command).main(commandArgs)
     }
   }
 
-
-  class Handler(clientContext: ClientContext, jar: String, fileConfig: String) extends Actor {
-    private var applications = Map.empty[String, Int]
-    private var topologies = Map.empty[String, StormTopology]
-    private val LOG = LogUtil.getLogger(classOf[Handler])
-
-    implicit val system = context.system
-
-    def receive: Receive = {
-      case Kill(name, option) =>
-        topologies -= name
-        clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found")))
-        val appId = applications(name)
-        applications -= name
-        LOG.info(s"Killed topology $name")
-        sender ! AppKilled(name, appId)
-      case Submit(name, uploadedJarLocation, appConfig, topology, option) =>
-        topologies += name -> topology
-
-        val gearpumpStormTopology = GearpumpStormTopology(topology, appConfig, fileConfig)
-        val stormConfig = gearpumpStormTopology.getStormConfig
-        val processorGraph = GraphBuilder.build(gearpumpStormTopology)
-        val config = UserConfig.empty
-            .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
-            .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
-        val app = StreamApplication(name, processorGraph, config)
-        val appId = clientContext.submit(app, jar)
-        applications += name -> appId
-        LOG.info(s"Storm Application $appId submitted")
-        sender ! AppSubmitted(name, appId)
-      case GetClusterInfo =>
-        val topologySummaryList = topologies.map { case (name, _) =>
-          new TopologySummary(name, name, 0, 0, 0, 0, "")
-        }.toSeq
-        sender ! new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
-      case GetTopology(id) =>
-        sender ! topologies(id)
+  def main(args: Array[String]) = {
+    if (args.length == 0) {
+      usage
+    } else {
+      val command = args(0)
+      val commandArgs = args.drop(1)
+      executeCommand(command, commandArgs)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
new file mode 100644
index 0000000..4204580
--- /dev/null
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm.main
+
+import java.io.{File, FileOutputStream, FileWriter}
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, WritableByteChannel}
+import java.util.{Map => JMap, UUID}
+
+import akka.actor.ActorSystem
+import backtype.storm.Config
+import backtype.storm.generated._
+import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
+import backtype.storm.utils.TimeCacheMap.ExpiredCallback
+import backtype.storm.utils.{TimeCacheMap, Utils}
+import com.typesafe.config.ConfigValueFactory
+import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.experiments.storm.topology.GearpumpStormTopology
+import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil}
+import io.gearpump.streaming.StreamApplication
+import io.gearpump.util.{FileUtils, AkkaApp, Constants, LogUtil}
+import org.apache.storm.shade.org.json.simple.JSONValue
+import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
+import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+
+object GearpumpNimbus extends AkkaApp with ArgumentsParser {
+  private val THRIFT_PORT = StormUtil.getThriftPort
+  private val OUTPUT = "output"
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml"))
+  )
+
+  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+    val parsed = parse(args)
+    val output = parsed.getString(OUTPUT)
+    val akkaConf = updateClientConfig(inputAkkaConf)
+    val system = ActorSystem("storm", akkaConf)
+    val clientContext = new ClientContext(akkaConf, system, null)
+    val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
+    val thriftConf: JMap[String, String] = Map(Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava
+    updateOutputStormConfig(thriftConf, output)
+    stormConf.putAll(thriftConf)
+    val thriftServer = createServer(clientContext, stormConf)
+    thriftServer.serve()
+    system.awaitTermination()
+  }
+
+  private def createServer(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = {
+    val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, stormConf))
+    val connectionType = ThriftConnectionType.NIMBUS
+    new ThriftServer(stormConf, processor, connectionType)
+  }
+
+  private def updateOutputStormConfig(conf: JMap[String, String], output: String): Unit = {
+    // read existing config
+    val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]]
+    outputConfig.putAll(conf)
+    val yaml = new Yaml(new SafeConstructor)
+    val writer = new FileWriter(new File(output))
+    yaml.dump(outputConfig, writer)
+  }
+
+  import Constants._
+  private def updateClientConfig(config: Config): Config = {
+    val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
+    val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
+    val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
+
+    val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath))
+        .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath))
+
+    if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
+      val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
+      updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
+    } else {
+      updated
+    }
+  }
+
+}
+
+class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface {
+  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
+  private var applications = Map.empty[String, Int]
+  private var topologies = Map.empty[String, (StormTopology, JMap[AnyRef, AnyRef])]
+  private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
+  private val expiredCallback = new ExpiredCallback[String, WritableByteChannel] {
+    override def expire(k: String, v: WritableByteChannel): Unit = {
+      v.close()
+    }
+  }
+  private val fileCacheMap = new TimeCacheMap[String, WritableByteChannel](expireSeconds, expiredCallback)
+
+  override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = {
+    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE))
+  }
+
+  override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
+
+    implicit val system = clientContext.system
+    val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
+    val stormConfig = gearpumpStormTopology.getStormConfig
+    val processorGraph = GraphBuilder.build(gearpumpStormTopology)
+    val config = UserConfig.empty
+          .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
+          .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
+    val app = StreamApplication(name, processorGraph, config)
+    LOG.info(s"jar file uploaded to $uploadedJarLocation")
+    val appId = clientContext.submit(app, uploadedJarLocation)
+    applications += name -> appId
+    topologies += name -> (topology, stormConfig)
+    LOG.info(s"Storm Application $appId submitted")
+  }
+
+  override def killTopologyWithOpts(name: String, options: KillOptions): Unit = {
+    clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found")))
+    applications -= name
+    topologies -= name
+    LOG.info(s"Killed topology $name")
+  }
+
+  override def getNimbusConf: String = {
+    JSONValue.toJSONString(stormConf)
+  }
+
+  override def getTopology(name: String): StormTopology = {
+    updateApps
+    topologies.getOrElse(name,
+      throw new RuntimeException(s"topology $name not found"))._1
+  }
+
+  override def getTopologyConf(name: String): String = {
+    updateApps
+    JSONValue.toJSONString(topologies.getOrElse(name,
+      throw new RuntimeException(s"topology $name not found"))._2)
+  }
+
+  override def getUserTopology(id: String): StormTopology = getTopology(id)
+
+  override def beginFileUpload(): String = {
+    val location = s"stormjar-${UUID.randomUUID()}.jar"
+    val channel = Channels.newChannel(new FileOutputStream(location))
+    fileCacheMap.put(location, channel)
+    LOG.info(s"Uploading file from client to $location")
+    location
+  }
+
+  override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
+    if (!fileCacheMap.containsKey(location)) {
+      throw new RuntimeException(s"File for $location does not exist (or timed out)")
+    } else {
+      val channel = fileCacheMap.get(location)
+      channel.write(chunk)
+      fileCacheMap.put(location, channel)
+    }
+  }
+
+  override def finishFileUpload(location: String): Unit = {
+    if (!fileCacheMap.containsKey(location)) {
+      throw new RuntimeException(s"File for $location does not exist (or timed out)")
+    } else {
+      val channel = fileCacheMap.get(location)
+      channel.close()
+      fileCacheMap.remove(location)
+      new File(location).delete()
+    }
+  }
+
+  override def getClusterInfo: ClusterSummary = {
+    updateApps
+    val topologySummaryList = topologies.map { case (name, _) =>
+      new TopologySummary(name, name, 0, 0, 0, 0, "")
+    }.toSeq
+    new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
+  }
+
+  override def beginFileDownload(file: String): String = {
+    throw new UnsupportedOperationException
+  }
+
+  override def uploadNewCredentials(s: String, credentials: Credentials): Unit = {
+    throw new UnsupportedOperationException
+  }
+    override def activate(name: String): Unit = {
+    throw new UnsupportedOperationException
+  }
+
+  override def rebalance(name: String, options: RebalanceOptions): Unit = {
+    throw new UnsupportedOperationException
+  }
+
+  override def deactivate(name: String): Unit = {
+    throw new UnsupportedOperationException
+  }
+
+  override def getTopologyInfo(name: String): TopologyInfo = {
+    throw new UnsupportedOperationException
+  }
+
+  override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = {
+    throw new UnsupportedOperationException
+  }
+
+  override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions())
+
+  override def downloadChunk(name: String): ByteBuffer = {
+    throw new UnsupportedOperationException
+  }
+
+  private def updateApps: Unit = {
+    clientContext.listApps.appMasters.foreach { app =>
+      val name = app.appName
+      if (applications.contains(app.appName)) {
+        if (app.status != MasterToAppMaster.AppMasterActive) {
+          applications -= name
+          topologies -= name
+        }
+      }
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
new file mode 100644
index 0000000..70efbf3
--- /dev/null
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm.main
+
+import backtype.storm.Config
+import backtype.storm.utils.Utils
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.util.Constants._
+import io.gearpump.util.{AkkaApp, LogUtil, Util}
+
+object GearpumpStormClient extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "jar" -> CLIOption[String]("<storm jar>", required = true),
+    "config" -> CLIOption[Int]("<storm config file>", required = true),
+    "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)))
+
+  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    val verbose = config.getBoolean("verbose")
+    if (verbose) {
+      LogUtil.verboseLogToConsole
+    }
+
+    val jar = config.getString("jar")
+    val stormConfig = config.getString("config")
+    val topology = config.remainArgs(0)
+    val stormArgs = config.remainArgs.drop(1)
+    val stormOptions = Array(
+      s"-Dstorm.options=${getThriftOptions(stormConfig)}",
+      s"-Dstorm.jar=$jar",
+      s"-Dstorm.config.file=$stormConfig",
+      s"-D${PREFER_IPV4}=true"
+    )
+
+    val classPath = Array(s"${System.getProperty(GEARPUMP_HOME)}/lib/storm/*", jar)
+    val process = Util.startProcess(stormOptions, classPath, topology, stormArgs)
+
+    // wait till the process exit
+    val exit = process.exitValue()
+
+    if (exit != 0) {
+      throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}")
+    }
+  }
+
+  private def getThriftOptions(stormConfig: String): String = {
+    val config = Utils.findAndReadConfigFile(stormConfig, true)
+    val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT)
+    s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=$thriftPort"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
index 996498b..b88a200 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
@@ -18,7 +18,6 @@
 
 package io.gearpump.experiments.storm.topology
 
-import java.io._
 import java.lang.{Iterable => JIterable}
 import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
 
@@ -35,8 +34,6 @@ import io.gearpump.experiments.storm.util.StormUtil._
 import io.gearpump.streaming.Processor
 import io.gearpump.streaming.task.Task
 import io.gearpump.util.LogUtil
-import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
-import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
 import org.slf4j.Logger
 
 import scala.collection.JavaConversions._
@@ -45,43 +42,18 @@ object GearpumpStormTopology {
   private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology])
 
   def apply(
+      name: String,
       topology: StormTopology,
-      appConfigInJson: String,
-      fileConfig: String)(implicit system: ActorSystem): GearpumpStormTopology = {
+      appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = {
     new GearpumpStormTopology(
+      name,
       topology,
       Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]],
-      parseJsonStringToMap(appConfigInJson),
-      readStormConfig(fileConfig)
+      parseJsonStringToMap(appConfigInJson)
     )
 
   }
 
-  /**
-   * @param configFile user provided local config file
-   * @return a config Map loaded from config file
-   */
-  private def readStormConfig(configFile: String): JMap[AnyRef, AnyRef] = {
-    var ret: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    try {
-      val yaml = new Yaml(new SafeConstructor)
-      val input: InputStream = new FileInputStream(configFile)
-      try {
-        ret = yaml.load(new InputStreamReader(input)).asInstanceOf[JMap[AnyRef, AnyRef]]
-      } catch {
-        case e: IOException =>
-          LOG.warn(s"failed to load config file $configFile")
-      } finally {
-        input.close()
-      }
-    } catch {
-      case e: FileNotFoundException =>
-        LOG.warn(s"failed to find config file $configFile")
-      case t: Throwable =>
-        LOG.error(t.getMessage)
-    }
-    ret
-  }
 }
 
 /**
@@ -91,21 +63,20 @@ object GearpumpStormTopology {
  *   3. provides interface for Gearpump applications to use Storm topology
  *
  * an implicit ActorSystem is required to create Gearpump processors
+ * @param name topology name
  * @param topology Storm topology
- * @param sysConfig configs from "defaults.yaml" and "storm.yaml"
+ * @param sysConfig configs from "defaults.yaml" and custom config file
  * @param appConfig config submitted from user application
- * @param fileConfig custom file config set by user in command line
  */
 private[storm] class GearpumpStormTopology(
+    name: String,
     topology: StormTopology,
     sysConfig: JMap[AnyRef, AnyRef],
-    appConfig: JMap[AnyRef, AnyRef],
-    fileConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
-  import io.gearpump.experiments.storm.topology.GearpumpStormTopology._
+    appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
 
   private val spouts = topology.get_spouts()
   private val bolts = topology.get_bolts()
-  private val stormConfig = mergeConfigs(sysConfig, appConfig, fileConfig, getComponentConfigs(spouts, bolts))
+  private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts))
   private val spoutProcessors = spouts.map { case (id, spout) =>
     id -> spoutToProcessor(id, spout, stormConfig.toMap) }.toMap
   private val boltProcessors = bolts.map { case (id, bolt) =>
@@ -114,7 +85,7 @@ private[storm] class GearpumpStormTopology(
 
   /**
    * @return merged Storm config with priority
-   *    defaults.yaml < storm.yaml < application config < component config < custom file config
+   *    defaults.yaml < custom file config < application config < component config
    */
   def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig
 
@@ -139,13 +110,12 @@ private[storm] class GearpumpStormTopology(
   private def mergeConfigs(
       sysConfig: JMap[AnyRef, AnyRef],
       appConfig: JMap[AnyRef, AnyRef],
-      fileConfig: JMap[AnyRef, AnyRef],
       componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = {
     val allConfig = new JHashMap[AnyRef, AnyRef]
     allConfig.putAll(sysConfig)
     allConfig.putAll(appConfig)
     allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap))
-    allConfig.putAll(fileConfig)
+    allConfig.put(Config.TOPOLOGY_NAME, name)
     allConfig
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
deleted file mode 100644
index 20b63b3..0000000
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import backtype.storm.generated.StormTopology
-import io.gearpump.experiments.storm.util.TopologyUtil
-import io.gearpump.cluster.TestUtil
-import Commands.{GetTopology, Kill, Submit}
-import GearpumpThriftServer.GearpumpNimbus
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.concurrent.Future
-
-class GearpumpNimbusSpec extends WordSpec with Matchers with MockitoSugar {
-
-  "GearpumpNimbus" should {
-    "submit and kill topology through ClientContext" in {
-
-      implicit val system = ActorSystem("storm-test", TestUtil.DEFAULT_CONFIG)
-      implicit val dispatcher = system.dispatcher
-
-      val handler = TestProbe()
-      val gearpumpNimbus = new GearpumpNimbus(handler.ref)
-
-      val appId = 0
-      val name = "test"
-      val uploadedJarLocation = "local"
-      val jsonConf = "storm_json_conf"
-      val topology = TopologyUtil.getTestTopology
-
-      Future(gearpumpNimbus.submitTopology(name, uploadedJarLocation, jsonConf, topology))
-      handler.expectMsgType[Submit]
-
-      Future(gearpumpNimbus.getTopology(name))
-      handler.expectMsgType[GetTopology]
-      handler.reply(new StormTopology)
-
-      Future(gearpumpNimbus.getUserTopology(name))
-      handler.expectMsgType[GetTopology]
-      handler.reply(new StormTopology)
-
-      Future(gearpumpNimbus.killTopology(name))
-      handler.expectMsgType[Kill]
-
-      system.shutdown()
-      system.awaitTermination()
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
deleted file mode 100644
index b1736f0..0000000
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import backtype.storm.security.auth.ThriftServer
-import org.mockito.Mockito._
-import org.scalatest.WordSpec
-import org.scalatest.mock.MockitoSugar
-
-class GearpumpThriftServerSpec extends WordSpec with MockitoSugar {
-
-  "GearpumpThriftServer" should {
-    "run ThriftServer.serve" in {
-      val tServer = mock[ThriftServer]
-      val thriftServer = new GearpumpThriftServer(tServer)
-
-      thriftServer.run()
-
-      verify(tServer).serve()
-    }
-
-    "stop ThriftServer" in {
-      val tServer = mock[ThriftServer]
-      val thriftServer = new GearpumpThriftServer(tServer)
-
-      thriftServer.close()
-
-      verify(tServer).stop()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
deleted file mode 100644
index 95cf279..0000000
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package io.gearpump.experiments.storm
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.experiments.storm.Commands.{AppKilled, AppSubmitted, Kill, Submit}
-import io.gearpump.experiments.storm.StormRunner.Handler
-import io.gearpump.experiments.storm.util.TopologyUtil
-import org.mockito.Matchers._
-import org.mockito.Mockito
-import org.scalatest.{FlatSpec, Matchers}
-
-class StormRunnerSpec extends FlatSpec with Matchers {
-  it should "handle submit/kill correctly"  in {
-    val conf = TestUtil.DEFAULT_CONFIG
-    implicit val system = ActorSystem("storm-test", conf)
-
-    val uploadedJarLocation = "local"
-    val jsonConf = "storm_json_conf"
-    val topology = TopologyUtil.getTestTopology
-
-    implicit val dispatcher = system.dispatcher
-    val clientContext = Mockito.mock(classOf[ClientContext])
-    Mockito.when(clientContext.submit(any(), any())).thenReturn(0)
-    val handler = system.actorOf(Props(new Handler(clientContext, "jar", "user_config")))
-    val client = TestProbe()
-
-    client.send(handler, Submit("app", uploadedJarLocation, jsonConf, topology, null))
-    client.expectMsg(AppSubmitted("app", 0))
-
-
-    client.send(handler, Kill("app", null))
-    client.expectMsg(AppKilled("app", 0))
-
-    system.shutdown()
-    system.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index d1c330b..8f10886 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -20,6 +20,7 @@ package io.gearpump.experiments.storm.topology
 
 import java.util.{HashMap => JHashMap, Map => JMap}
 
+import backtype.storm.Config
 import io.gearpump.experiments.storm.processor.StormProcessor
 import io.gearpump.experiments.storm.producer.StormProducer
 import io.gearpump.experiments.storm.util.TopologyUtil
@@ -40,28 +41,27 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
       val sysConfig = newJavaConfig(name, sysVal)
       val appVal = "app"
       val appConfig = newJavaConfig(name, appVal)
-      val fileVal = "file"
-      val fileConfig = newJavaConfig(name, fileVal)
 
       implicit val system = MockUtil.system
-      val topology1 = new GearpumpStormTopology(stormTopology, newEmptyConfig, newEmptyConfig, newEmptyConfig)
-      topology1.getStormConfig shouldBe empty
+      val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, newEmptyConfig)
+      topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1"
+      topology1.getStormConfig should not contain name
 
-      val topology2 = new GearpumpStormTopology(stormTopology, sysConfig, newEmptyConfig, newEmptyConfig)
+      val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, newEmptyConfig)
+      topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2"
       topology2.getStormConfig.get(name) shouldBe sysVal
 
-      val topology3 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, newEmptyConfig)
+      val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig)
+      topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3"
       topology3.getStormConfig.get(name) shouldBe appVal
 
-      val topology4 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, fileConfig)
-      topology4.getStormConfig.get(name) shouldBe fileVal
     }
 
     "create Gearpump processors from Storm topology" in {
       val stormTopology = TopologyUtil.getTestTopology
       implicit val system = MockUtil.system
       val gearpumpStormTopology =
-        GearpumpStormTopology(stormTopology, null, "")
+        GearpumpStormTopology("app", stormTopology, null)
       val processors = gearpumpStormTopology.getProcessors
       stormTopology.get_spouts().foreach { case (spoutId, _) =>
         val processor = processors(spoutId)
@@ -80,7 +80,7 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
       implicit val system = MockUtil.system
       val sysConfig = new JHashMap[AnyRef, AnyRef]
       val gearpumpStormTopology =
-        GearpumpStormTopology(stormTopology, null, "")
+        GearpumpStormTopology("app", stormTopology, null)
       val targets0 = gearpumpStormTopology.getTargets("1")
       targets0 should contain key "3"
       targets0 should contain key "4"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
index ed792f4..15acf4e 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
@@ -20,7 +20,6 @@ package io.gearpump.integrationtest.storm
 
 
 import backtype.storm.utils.{Utils, DRPCClient}
-import io.gearpump.experiments.storm.GearpumpThriftServer
 import io.gearpump.integrationtest.Docker
 import io.gearpump.integrationtest.minicluster.BaseContainer
 import org.apache.log4j.Logger
@@ -30,23 +29,28 @@ class StormClient(masterAddrs: Seq[(String, Int)]) {
 
   private val LOG = Logger.getLogger(getClass)
   private val STORM_HOST = "storm0"
-  private val STORM_CMD = "/opt/start storm"
+  private val STORM_NIMBUS = "/opt/start storm nimbus"
+  private val STORM_APP = "/opt/start storm app"
   private val STORM_DRPC = "storm-drpc"
   private val CONFIG_FILE = "storm.yaml"
-  private val NIMBUS_THRIFT_PORT = GearpumpThriftServer.THRIFT_PORT
   private val DRPC_PORT = 3772
   private val DRPC_INVOCATIONS_PORT = 3773
 
   private val container = new BaseContainer(STORM_HOST, STORM_DRPC, masterAddrs,
-    tunnelPorts = Set(NIMBUS_THRIFT_PORT, DRPC_PORT, DRPC_INVOCATIONS_PORT))
+    tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
 
   def start(): Unit = {
     container.createAndStart()
+    startNimbus
+  }
+
+  private def startNimbus: String = {
+    Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE")
   }
 
   def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = {
     try {
-      Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_CMD -config $CONFIG_FILE " +
+      Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config $CONFIG_FILE " +
         s"-jar $jar $mainClass $args").split("\n")
         .filter(_.contains("The application id is ")).head.split(" ").last.toInt
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index b242b36..a3852c4 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -59,7 +59,7 @@ object Pack extends sbt.Build {
             "worker" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-DlogFilename=worker", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
             "services" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
             "yarnclient" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
-            "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost")
+            "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}")
           ),
           packLibDir := Map(
             "lib" -> new ProjectsToPack(core.id, streaming.id),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index e2b2c47..aa90726 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -221,7 +221,7 @@ object MasterService {
    */
   def submitStormApp(jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = {
     submitAndDeleteTempFiles(
-      "io.gearpump.experiments.storm.StormRunner",
+      "io.gearpump.experiments.storm.main.GearpumpStormClient",
       argsArray = spaceSeparatedArgumentsToArray(args),
       fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get),
       classPath = getStormApplicationClassPath,