You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:35 UTC

[25/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
deleted file mode 100644
index 554210c..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import java.lang.{Boolean => JBoolean}
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-import akka.actor.ActorSystem
-import backtype.storm.Config
-import backtype.storm.generated._
-import org.apache.storm.shade.org.json.simple.JSONValue
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
-import io.gearpump.experiments.storm.topology._
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.streaming.task.{TaskContext, TaskId}
-import io.gearpump.util.Util
-
-object StormUtil {
-
-  /**
-   * Convert storm task id to gearpump [[io.gearpump.streaming.task.TaskId]]
-   *
-   * The high 16 bit of an Int is TaskId.processorId
-   * The low 16 bit of an Int is TaskId.index
-   */
-  def stormTaskIdToGearpump(id: Integer): TaskId = {
-    val index = id & 0xFFFF
-    val processorId = id >> 16
-    TaskId(processorId, index)
-  }
-
-  /**
-   * convert gearpump [[TaskId]] to storm task id
-   * TaskId.processorId is the high 16 bit of an Int
-   * TaskId.index is the low 16 bit  of an Int
-   */
-  def gearpumpTaskIdToStorm(taskId: TaskId): Integer = {
-    val index = taskId.index
-    val processorId = taskId.processorId
-    (processorId << 16) + (index & 0xFFFF)
-  }
-
-  /**
-   * @return a configured [[GearpumpStormComponent]]
-   */
-  def getGearpumpStormComponent(
-      taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem)
-    : GearpumpStormComponent = {
-    val topology = conf.getValue[StormTopology](STORM_TOPOLOGY).get
-    val stormConfig = conf.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get
-    val componentId = conf.getString(STORM_COMPONENT).get
-    val spouts = topology.get_spouts
-    val bolts = topology.get_bolts
-    if (spouts.containsKey(componentId)) {
-      GearpumpSpout(topology, stormConfig, spouts.get(componentId), taskContext)
-    } else if (bolts.containsKey(componentId)) {
-      GearpumpBolt(topology, stormConfig, bolts.get(componentId), taskContext)
-    } else {
-      throw new Exception(s"storm component $componentId not found")
-    }
-  }
-
-  /**
-   * Parses config in json to map, returns empty map for invalid json string
-   *
-   * @param json config in json
-   * @return config in map
-   */
-  def parseJsonStringToMap(json: String): JMap[AnyRef, AnyRef] = {
-    Option(json).flatMap(json => JSONValue.parse(json) match {
-      case m: JMap[_, _] => Option(m.asInstanceOf[JMap[AnyRef, AnyRef]])
-      case _ => None
-    }).getOrElse(new JHashMap[AnyRef, AnyRef])
-  }
-
-  /**
-   * get Int value of the config name
-   */
-  def getInt(conf: JMap[_, _], name: String): Option[Int] = {
-    Option(conf.get(name)).map {
-      case number: Number => number.intValue
-      case invalid => throw new IllegalArgumentException(
-        s"$name must be Java Integer; actual: ${invalid.getClass}")
-    }
-  }
-
-  /**
-   * get Boolean value of the config name
-   */
-  def getBoolean(conf: JMap[_, _], name: AnyRef): Option[Boolean] = {
-    Option(conf.get(name)).map {
-      case b: JBoolean => b.booleanValue()
-      case invalid => throw new IllegalArgumentException(
-        s"$name must be a Java Boolean; acutal: ${invalid.getClass}")
-    }
-  }
-
-  /**
-   * clojure mod ported from Storm
-   * see https://clojuredocs.org/clojure.core/mod
-   */
-  def mod(num: Int, div: Int): Int = {
-    (num % div + div) % div
-  }
-
-  def ackEnabled(config: JMap[AnyRef, AnyRef]): Boolean = {
-    if (config.containsKey(Config.TOPOLOGY_ACKER_EXECUTORS)) {
-      getInt(config, Config.TOPOLOGY_ACKER_EXECUTORS).map(_ != 0).getOrElse(true)
-    } else {
-      false
-    }
-  }
-
-  def getThriftPort(): Int = {
-    Util.findFreePort().getOrElse(
-      throw new Exception("unable to find free port for thrift server"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala
new file mode 100644
index 0000000..51760ba
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm
+
+import org.apache.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+object StormRunner {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient)
+
+  private def usage(): Unit = {
+    val keys = commands.keys.toList.sorted
+    // scalastyle:off println
+    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+    // scalastyle:on println
+  }
+
+  private def executeCommand(command: String, commandArgs: Array[String]): Unit = {
+    if (!commands.contains(command)) {
+      usage()
+    } else {
+      commands(command).main(commandArgs)
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    if (args.length == 0) {
+      usage()
+    } else {
+      val command = args(0)
+      val commandArgs = args.drop(1)
+      executeCommand(command, commandArgs)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
new file mode 100644
index 0000000..544a4eb
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.main
+
+import java.io.{File, FileOutputStream, FileWriter}
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, WritableByteChannel}
+import java.util.{HashMap => JHashMap, Map => JMap, UUID}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+import akka.actor.ActorSystem
+import com.typesafe.config.ConfigValueFactory
+import backtype.storm.Config
+import backtype.storm.generated._
+import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
+import backtype.storm.utils.Utils
+import org.apache.storm.shade.org.json.simple.JSONValue
+import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
+import org.apache.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback
+import org.apache.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil, TimeCacheMapWrapper}
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
+
+object GearpumpNimbus extends AkkaApp with ArgumentsParser {
+  private val THRIFT_PORT = StormUtil.getThriftPort()
+  private val OUTPUT = "output"
+  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    OUTPUT -> CLIOption[String]("<output path for configuration file>",
+      required = false, defaultValue = Some("app.yaml"))
+  )
+
+  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+    val parsed = parse(args)
+    val output = parsed.getString(OUTPUT)
+    val akkaConf = updateClientConfig(inputAkkaConf)
+    val system = ActorSystem("storm", akkaConf)
+
+    val clientContext = new ClientContext(akkaConf, system, null)
+    val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
+    val thriftConf: JMap[AnyRef, AnyRef] = Map(
+      Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),
+      Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]]
+    updateStormConfig(thriftConf, output)
+    stormConf.putAll(thriftConf)
+
+    import scala.concurrent.ExecutionContext.Implicits.global
+    Future {
+      val thriftServer = createServer(clientContext, stormConf)
+      thriftServer.serve()
+    }
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  private def createServer(
+      clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = {
+    val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext,
+      stormConf))
+    val connectionType = ThriftConnectionType.NIMBUS
+    new ThriftServer(stormConf, processor, connectionType)
+  }
+
+  private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: String): Unit = {
+    val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+    val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]]
+    updatedConfig.putAll(outputConfig)
+    updatedConfig.putAll(thriftConfig)
+    val yaml = new Yaml
+    val serialized = yaml.dumpAsMap(updatedConfig)
+    val writer = new FileWriter(new File(output))
+    try {
+      writer.write(serialized)
+    } catch {
+      case e: Exception => throw e
+    } finally {
+      writer.close()
+    }
+  }
+
+  import org.apache.gearpump.util.Constants._
+  private def updateClientConfig(config: Config): Config = {
+    val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
+    val appClassPath = s"$storm${File.pathSeparator}" +
+      config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
+    val executorClassPath = s"$storm${File.pathSeparator}" +
+      config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
+
+    val updated = config
+      .withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath))
+      .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH,
+        ConfigValueFactory.fromAnyRef(executorClassPath))
+
+    if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
+      val serializerConfig = ConfigValueFactory.fromAnyRef(
+        config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
+      updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
+    } else {
+      updated
+    }
+  }
+}
+
+class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef])
+  extends Nimbus.Iface {
+
+  import org.apache.gearpump.experiments.storm.main.GearpumpNimbus._
+
+  private var applications = Map.empty[String, Int]
+  private var topologies = Map.empty[String, TopologyData]
+  private val expireSeconds = StormUtil.getInt(stormConf,
+    Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
+  private val expiredCallback = new Callback[String, WritableByteChannel] {
+    override def expire(k: String, v: WritableByteChannel): Unit = {
+      v.close()
+    }
+  }
+  private val fileCacheMap = new TimeCacheMapWrapper[String, WritableByteChannel](expireSeconds,
+    expiredCallback)
+
+  override def submitTopology(
+      name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology)
+    : Unit = {
+    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology,
+      new SubmitOptions(TopologyInitialStatus.ACTIVE))
+  }
+
+  override def submitTopologyWithOpts(
+      name: String, uploadedJarLocation: String,
+      jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
+    LOG.info(s"Submitted topology $name")
+    implicit val system = clientContext.system
+    val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
+    val stormConfig = gearpumpStormTopology.getStormConfig
+    val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1)
+    val processorGraph = GraphBuilder.build(gearpumpStormTopology)
+    val config = UserConfig.empty
+      .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
+      .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
+    val app = StreamApplication(name, processorGraph, config)
+    LOG.info(s"jar file uploaded to $uploadedJarLocation")
+    val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
+    applications += name -> appId
+    topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation)
+    LOG.info(s"Storm Application $appId submitted")
+  }
+
+  override def killTopologyWithOpts(name: String, options: KillOptions): Unit = {
+    if (applications.contains(name)) {
+      clientContext.shutdown(applications(name))
+      removeTopology(name)
+      LOG.info(s"Killed topology $name")
+    } else {
+      throw new RuntimeException(s"topology $name not found")
+    }
+  }
+
+  override def getNimbusConf: String = {
+    JSONValue.toJSONString(stormConf)
+  }
+
+  override def getTopology(name: String): StormTopology = {
+    updateApps()
+    topologies.getOrElse(name,
+      throw new RuntimeException(s"topology $name not found")).topology
+  }
+
+  override def getTopologyConf(name: String): String = {
+    updateApps()
+    JSONValue.toJSONString(topologies.getOrElse(name,
+      throw new RuntimeException(s"topology $name not found")).config)
+  }
+
+  override def getUserTopology(id: String): StormTopology = getTopology(id)
+
+  override def beginFileUpload(): String = {
+    val file = File.createTempFile(s"storm-jar-${UUID.randomUUID()}", ".jar")
+    val location = file.getAbsolutePath
+    val channel = Channels.newChannel(new FileOutputStream(location))
+    fileCacheMap.put(location, channel)
+    LOG.info(s"Uploading file from client to $location")
+    location
+  }
+
+  override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
+    if (!fileCacheMap.containsKey(location)) {
+      throw new RuntimeException(s"File for $location does not exist (or timed out)")
+    } else {
+      val channel = fileCacheMap.get(location)
+      channel.write(chunk)
+      fileCacheMap.put(location, channel)
+    }
+  }
+
+  override def finishFileUpload(location: String): Unit = {
+    if (!fileCacheMap.containsKey(location)) {
+      throw new RuntimeException(s"File for $location does not exist (or timed out)")
+    } else {
+      val channel = fileCacheMap.get(location)
+      channel.close()
+      fileCacheMap.remove(location)
+    }
+  }
+
+  override def getClusterInfo: ClusterSummary = {
+    updateApps()
+    val topologySummaryList = topologies.map { case (name, _) =>
+      new TopologySummary(name, name, 0, 0, 0, 0, "")
+    }.toSeq
+    new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
+  }
+
+  override def beginFileDownload(file: String): String = {
+    throw new UnsupportedOperationException
+  }
+
+  override def uploadNewCredentials(s: String, credentials: Credentials): Unit = {
+    throw new UnsupportedOperationException
+  }
+  override def activate(name: String): Unit = {
+    throw new UnsupportedOperationException
+  }
+
+  override def rebalance(name: String, options: RebalanceOptions): Unit = {
+    throw new UnsupportedOperationException
+  }
+
+  override def deactivate(name: String): Unit = {
+    throw new UnsupportedOperationException
+  }
+
+  override def getTopologyInfo(name: String): TopologyInfo = {
+    throw new UnsupportedOperationException
+  }
+
+  override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = {
+    throw new UnsupportedOperationException
+  }
+
+  override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions())
+
+  override def downloadChunk(name: String): ByteBuffer = {
+    throw new UnsupportedOperationException
+  }
+
+  private def updateApps(): Unit = {
+    clientContext.listApps.appMasters.foreach { app =>
+      val name = app.appName
+      if (applications.contains(name)) {
+        if (app.status != MasterToAppMaster.AppMasterActive) {
+          removeTopology(name)
+        }
+      }
+    }
+  }
+
+  private def removeTopology(name: String): Unit = {
+    applications -= name
+    val jar = topologies(name).jar
+    new File(jar).delete()
+    topologies -= name
+  }
+}
+
+case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], jar: String)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala
new file mode 100644
index 0000000..1cfd5a4
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.main
+
+import backtype.storm.Config
+import backtype.storm.utils.Utils
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{AkkaApp, LogUtil, Util}
+
+object GearpumpStormClient extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "jar" -> CLIOption[String]("<storm jar>", required = true),
+    "config" -> CLIOption[Int]("<storm config file>", required = true),
+    "verbose" -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)))
+
+  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    val verbose = config.getBoolean("verbose")
+    if (verbose) {
+      LogUtil.verboseLogToConsole()
+    }
+
+    val jar = config.getString("jar")
+    val stormConfig = config.getString("config")
+    val topology = config.remainArgs(0)
+    val stormArgs = config.remainArgs.drop(1)
+    val stormOptions = Array(
+      s"-Dstorm.options=${getThriftOptions(stormConfig)}",
+      s"-Dstorm.jar=$jar",
+      s"-Dstorm.conf.file=$stormConfig",
+      s"-D${PREFER_IPV4}=true"
+    )
+
+    val gearpumpHome = System.getProperty(GEARPUMP_HOME)
+    val classPath = Array(s"$gearpumpHome/lib/*", s"$gearpumpHome/lib/storm/*", jar)
+    val process = Util.startProcess(stormOptions, classPath, topology, stormArgs)
+
+    // Waits till the process exit
+    val exit = process.exitValue()
+
+    if (exit != 0) {
+      throw new Exception(s"failed to submit jar, exit code $exit, " +
+        s"error summary: ${process.logger.error}")
+    }
+  }
+
+  private def getThriftOptions(stormConfig: String): String = {
+    val config = Utils.findAndReadConfigFile(stormConfig, true)
+    val host = config.get(Config.NIMBUS_HOST)
+    val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT)
+    s"${Config.NIMBUS_HOST}=$host,${Config.NIMBUS_THRIFT_PORT}=$thriftPort"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
new file mode 100644
index 0000000..aaa0a99
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner}
+
+/**
+ * Partitioner bound to a target Storm component
+ *
+ * Partitioning is already done in
+ * [[org.apache.gearpump.experiments.storm.util.StormOutputCollector]] and
+ * kept in "targetPartitions" of [[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]]
+ * the partitioner just returns the partitions of the target
+ *
+ * In Gearpump, a message is sent from a task to all the subscribers.
+ * In Storm, however, a message is sent to one or more of the subscribers.
+ * Hence, we have to do the partitioning in
+ * [[org.apache.gearpump.experiments.storm.util.StormOutputCollector]] till the Storm way is
+ * supported in Gearpump
+ *
+ * @param target target storm component id
+ */
+private[storm] class StormPartitioner(target: String) extends MulticastPartitioner {
+
+  override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int)
+    : Array[Int] = {
+    val stormTuple = msg.msg.asInstanceOf[GearpumpTuple]
+    stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
new file mode 100644
index 0000000..a70ce48
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.processor
+
+import java.util.{Collection => JCollection, List => JList}
+
+import backtype.storm.task.IOutputCollector
+import backtype.storm.tuple.Tuple
+import org.apache.gearpump.experiments.storm.topology.TimedTuple
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+import org.apache.gearpump.streaming.task.ReportCheckpointClock
+
+/**
+ * this is used by Storm bolt to emit messages
+ */
+private[storm] class StormBoltOutputCollector(collector: StormOutputCollector,
+    ackEnabled: Boolean = false) extends IOutputCollector {
+  private var reportTime = 0L
+  private var maxAckTime = 0L
+
+  override def emit(
+      streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): JList[Integer] = {
+    collector.emit(streamId, tuple)
+  }
+
+  override def emitDirect(
+      taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): Unit = {
+    collector.emitDirect(taskId, streamId, tuple)
+  }
+
+  override def fail(tuple: Tuple): Unit = {
+    // application failure, throw exception such that the tuple can be replayed
+    // Note: do not print the tuple which will trigger NPE since its messageId is null
+    throw new Exception("Storm Bolt.execute failed")
+  }
+
+  override def ack(tuple: Tuple): Unit = {
+    if (ackEnabled) {
+      tuple match {
+        case timedTuple: TimedTuple =>
+          maxAckTime = Math.max(maxAckTime, timedTuple.timestamp)
+          val taskContext = collector.taskContext
+          val upstreamMinClock = taskContext.upstreamMinClock
+          if (reportTime <= upstreamMinClock && upstreamMinClock <= maxAckTime) {
+            reportTime = upstreamMinClock / CHECKPOINT_INTERVAL_MILLIS * CHECKPOINT_INTERVAL_MILLIS
+            taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, reportTime)
+            reportTime += CHECKPOINT_INTERVAL_MILLIS
+          }
+        case _ =>
+        // ignore other tuples
+      }
+    }
+  }
+
+  override def reportError(throwable: Throwable): Unit = {
+    throw throwable
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
new file mode 100644
index 0000000..1d3048e
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.processor
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.Duration
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
+import org.apache.gearpump.experiments.storm.util._
+import org.apache.gearpump.streaming.task._
+
+object StormProcessor {
+  private[storm] val TICK = Message("tick")
+}
+
+/**
+ * this is runtime container for Storm bolt
+ */
+private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
+    taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+  import org.apache.gearpump.experiments.storm.processor.StormProcessor._
+
+  def this(taskContext: TaskContext, conf: UserConfig) = {
+    this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system)
+      .asInstanceOf[GearpumpBolt], taskContext, conf)
+  }
+
+  private val freqOpt = gearpumpBolt.getTickFrequency
+
+  override def onStart(startTime: StartTime): Unit = {
+    gearpumpBolt.start(startTime)
+    freqOpt.foreach(scheduleTick)
+  }
+
+  override def onNext(message: Message): Unit = {
+    message match {
+      case TICK =>
+        freqOpt.foreach { freq =>
+          gearpumpBolt.tick(freq)
+          scheduleTick(freq)
+        }
+      case _ =>
+        gearpumpBolt.next(message)
+    }
+  }
+
+  private def scheduleTick(freq: Int): Unit = {
+    taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)) {
+      self ! TICK
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
new file mode 100644
index 0000000..5d4a6a2
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.producer
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
+import org.apache.gearpump.experiments.storm.util._
+import org.apache.gearpump.streaming.task._
+
+import scala.concurrent.duration.Duration
+
+object StormProducer {
+  private[storm] val TIMEOUT = Message("timeout")
+}
+
+/**
+ * this is runtime container for Storm spout
+ */
+private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
+    taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+  import org.apache.gearpump.experiments.storm.producer.StormProducer._
+
+  def this(taskContext: TaskContext, conf: UserConfig) = {
+    this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system)
+      .asInstanceOf[GearpumpSpout], taskContext, conf)
+  }
+
+  private val timeoutMillis = gearpumpSpout.getMessageTimeout
+
+  override def onStart(startTime: StartTime): Unit = {
+    gearpumpSpout.start(startTime)
+    if (gearpumpSpout.ackEnabled) {
+      getCheckpointClock
+    }
+    timeoutMillis.foreach(scheduleTimeout)
+    self ! Message("start")
+  }
+
+  override def onNext(msg: Message): Unit = {
+    msg match {
+      case TIMEOUT =>
+        timeoutMillis.foreach { timeout =>
+          gearpumpSpout.timeout(timeout)
+          scheduleTimeout(timeout)
+        }
+      case _ =>
+        gearpumpSpout.next(msg)
+    }
+    self ! Message("continue")
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case CheckpointClock(optClock) =>
+      optClock.foreach { clock =>
+        gearpumpSpout.checkpoint(clock)
+      }
+      getCheckpointClock()
+  }
+
+  def getCheckpointClock(): Unit = {
+    taskContext.scheduleOnce(Duration(StormConstants.CHECKPOINT_INTERVAL_MILLIS,
+      TimeUnit.MILLISECONDS))(taskContext.appMaster ! GetCheckpointClock)
+  }
+
+  private def scheduleTimeout(timeout: Long): Unit = {
+    taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)) {
+      self ! TIMEOUT
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
new file mode 100644
index 0000000..5794b1d
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.producer
+
+import java.util.{List => JList}
+
+import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+
+case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp)
+
+/**
+ * this is used by Storm Spout to emit messages
+ */
+private[storm] class StormSpoutOutputCollector(
+    collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean)
+  extends ISpoutOutputCollector {
+
+  private var checkpointClock = 0L
+  private var pendingMessage: Option[PendingMessage] = None
+  private var nextPendingMessage: Option[PendingMessage] = None
+
+  override def emit(streamId: String, values: JList[AnyRef], messageId: Object): JList[Integer] = {
+    val curTime = System.currentTimeMillis()
+    collector.setTimestamp(curTime)
+    val outTasks = collector.emit(streamId, values)
+    setPendingOrAck(messageId, curTime, curTime)
+    outTasks
+  }
+
+  override def reportError(throwable: Throwable): Unit = {
+    throw throwable
+  }
+
+  override def emitDirect(taskId: Int, streamId: String, values: JList[AnyRef], messageId: Object)
+    : Unit = {
+    val curTime = System.currentTimeMillis()
+    collector.setTimestamp(curTime)
+    collector.emitDirect(taskId, streamId, values)
+    setPendingOrAck(messageId, curTime, curTime)
+  }
+
+  def ackPendingMessage(checkpointClock: TimeStamp): Unit = {
+    this.checkpointClock = checkpointClock
+    nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) =>
+      if (messageTime <= this.checkpointClock) {
+        pendingMessage.foreach { case PendingMessage(id, _, _) =>
+          spout.ack(id)
+          reset()
+        }
+      }
+    }
+  }
+
+  def failPendingMessage(timeoutMillis: Long): Unit = {
+    pendingMessage.foreach { case PendingMessage(id, _, startTime) =>
+      if (System.currentTimeMillis() - startTime >= timeoutMillis) {
+        spout.fail(id)
+        reset()
+      }
+    }
+  }
+
+  private def reset(): Unit = {
+    pendingMessage = nextPendingMessage
+    nextPendingMessage = None
+  }
+
+  private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp)
+    : Unit = {
+    if (ackEnabled) {
+      val newPendingMessage = PendingMessage(messageId, messageTime, startTime)
+      pendingMessage match {
+        case Some(msg) =>
+          if (nextPendingMessage.isEmpty && msg.messageTime <= this.checkpointClock) {
+            nextPendingMessage = Some(newPendingMessage)
+          } else {
+            spout.ack(messageId)
+          }
+        case None =>
+          pendingMessage = Some(newPendingMessage)
+      }
+    } else {
+      spout.ack(messageId)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
new file mode 100644
index 0000000..d0f2949
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.io.{File, FileOutputStream, IOException}
+import java.util
+import java.util.jar.JarFile
+import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+
+import akka.actor.ActorRef
+import akka.pattern.ask
+import backtype.storm.Config
+import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology}
+import backtype.storm.metric.api.IMetric
+import backtype.storm.spout.{ISpout, SpoutOutputCollector}
+import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
+import backtype.storm.tuple.{Fields, Tuple, TupleImpl}
+import backtype.storm.utils.Utils
+import clojure.lang.Atom
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector
+import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormUtil._
+import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
+import org.apache.gearpump.streaming.DAG
+import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
+import org.apache.gearpump.util.{Constants, LogUtil}
+import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{Await, Future}
+
+/**
+ * subclass wraps Storm Spout and Bolt, and their lifecycles
+ * hides the complexity from Gearpump applications
+ */
+trait GearpumpStormComponent {
+  /**
+   * invoked at Task.onStart
+   * @param startTime task start time
+   */
+  def start(startTime: StartTime): Unit
+
+  /**
+   * invoked at Task.onNext
+   * @param message incoming message
+   */
+  def next(message: Message): Unit
+
+  /**
+   * invoked at Task.onStop
+   */
+  def stop(): Unit = {}
+}
+
+object GearpumpStormComponent {
+  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormComponent])
+
+  object GearpumpSpout {
+    def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
+        spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = {
+      val componentCommon = spoutSpec.get_common()
+      val scalaMap = config.asScala.toMap // Convert to scala immutable map
+      val normalizedConfig = normalizeConfig(scalaMap, componentCommon)
+      val getTopologyContext = (dag: DAG, taskId: TaskId) => {
+        val stormTaskId = gearpumpTaskIdToStorm(taskId)
+        buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
+      }
+      val spout = Utils.getSetComponentObject(spoutSpec.get_spout_object()).asInstanceOf[ISpout]
+      val ackEnabled = StormUtil.ackEnabled(config)
+      if (ackEnabled) {
+        val className = spout.getClass.getName
+        if (!isSequentiallyReplayableSpout(className)) {
+          LOG.warn(s"at least once is not supported for $className")
+        }
+      }
+      val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => {
+        new StormSpoutOutputCollector(
+          StormOutputCollector(taskContext, topologyContext), spout, ackEnabled)
+      }
+      GearpumpSpout(
+        normalizedConfig,
+        spout,
+        askAppMasterForDAG,
+        getTopologyContext,
+        getOutputCollector,
+        ackEnabled,
+        taskContext)
+    }
+
+    private def isSequentiallyReplayableSpout(className: String): Boolean = {
+      className.equals("storm.kafka.KafkaSpout")
+    }
+  }
+
+  case class GearpumpSpout(
+      config: JMap[AnyRef, AnyRef],
+      spout: ISpout,
+      getDAG: ActorRef => DAG,
+      getTopologyContext: (DAG, TaskId) => TopologyContext,
+      getOutputCollector: (TaskContext, TopologyContext) => StormSpoutOutputCollector,
+      ackEnabled: Boolean,
+      taskContext: TaskContext)
+    extends GearpumpStormComponent {
+
+    private var collector: StormSpoutOutputCollector = null
+
+    override def start(startTime: StartTime): Unit = {
+      val dag = getDAG(taskContext.appMaster)
+      val topologyContext = getTopologyContext(dag, taskContext.taskId)
+      collector = getOutputCollector(taskContext, topologyContext)
+      spout.open(config, topologyContext, new SpoutOutputCollector(collector))
+    }
+
+    override def next(message: Message): Unit = {
+      spout.nextTuple()
+    }
+
+    /**
+     * @return timeout in milliseconds if enabled
+     */
+    def getMessageTimeout: Option[Long] = {
+      StormUtil.getBoolean(config, Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).flatMap {
+        timeoutEnabled =>
+          if (timeoutEnabled) {
+            StormUtil.getInt(config, Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).map(_ * 1000L)
+          } else {
+            None
+          }
+      }
+    }
+
+    def checkpoint(clock: TimeStamp): Unit = {
+      collector.ackPendingMessage(clock)
+    }
+
+    def timeout(timeoutMillis: Long): Unit = {
+      collector.failPendingMessage(timeoutMillis)
+    }
+  }
+
+  object GearpumpBolt {
+    def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
+        boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = {
+      val configAsScalaMap = config.asScala.toMap // Convert to scala immutable map
+      val normalizedConfig = normalizeConfig(configAsScalaMap, boltSpec.get_common())
+      val getTopologyContext = (dag: DAG, taskId: TaskId) => {
+        val stormTaskId = gearpumpTaskIdToStorm(taskId)
+        buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
+      }
+      val getGeneralTopologyContext = (dag: DAG) => {
+        buildGeneralTopologyContext(dag, topology, normalizedConfig)
+      }
+      val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => {
+        StormOutputCollector(taskContext, topologyContext)
+      }
+      val getTickTuple = (topologyContext: GeneralTopologyContext, freq: Int) => {
+
+        val values = new util.ArrayList[Object] // To be compatible with Java interface
+        values.add(freq.asInstanceOf[java.lang.Integer])
+        new TupleImpl(topologyContext, values, SYSTEM_TASK_ID, SYSTEM_TICK_STREAM_ID, null)
+      }
+      GearpumpBolt(
+        normalizedConfig,
+        Utils.getSetComponentObject(boltSpec.get_bolt_object()).asInstanceOf[IBolt],
+        askAppMasterForDAG,
+        getTopologyContext,
+        getGeneralTopologyContext,
+        getOutputCollector,
+        getTickTuple,
+        taskContext)
+    }
+  }
+
+  case class GearpumpBolt(
+      config: JMap[AnyRef, AnyRef],
+      bolt: IBolt,
+      getDAG: ActorRef => DAG,
+      getTopologyContext: (DAG, TaskId) => TopologyContext,
+      getGeneralTopologyContext: DAG => GeneralTopologyContext,
+      getOutputCollector: (TaskContext, TopologyContext) => StormOutputCollector,
+      getTickTuple: (GeneralTopologyContext, Int) => Tuple,
+      taskContext: TaskContext)
+    extends GearpumpStormComponent {
+
+    private var collector: StormOutputCollector = null
+    private var topologyContext: TopologyContext = null
+    private var generalTopologyContext: GeneralTopologyContext = null
+    private var tickTuple: Tuple = null
+
+    override def start(startTime: StartTime): Unit = {
+      val dag = getDAG(taskContext.appMaster)
+      topologyContext = getTopologyContext(dag, taskContext.taskId)
+      generalTopologyContext = getGeneralTopologyContext(dag)
+      collector = getOutputCollector(taskContext, topologyContext)
+      val delegate = new StormBoltOutputCollector(collector, StormUtil.ackEnabled(config))
+      bolt.prepare(config, topologyContext, new OutputCollector(delegate))
+    }
+
+    override def next(message: Message): Unit = {
+      val timestamp = message.timestamp
+      collector.setTimestamp(timestamp)
+      bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
+        timestamp))
+    }
+
+    def getTickFrequency: Option[Int] = {
+      StormUtil.getInt(config, Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)
+    }
+
+    /**
+     * invoked at TICK message when "topology.tick.tuple.freq.secs" is configured
+     * @param freq tick frequency
+     */
+    def tick(freq: Int): Unit = {
+      if (null == tickTuple) {
+        tickTuple = getTickTuple(generalTopologyContext, freq)
+      }
+      bolt.execute(tickTuple)
+    }
+  }
+
+  /**
+   * normalize general config with per component configs
+   * "topology.transactional.id" and "topology.tick.tuple.freq.secs"
+   * @param stormConfig general config for all components
+   * @param componentCommon common component parts
+   */
+  private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef],
+      componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = {
+    val config: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+    config.putAll(stormConfig.asJava)
+    val componentConfig = parseJsonStringToMap(componentCommon.get_json_conf())
+    Option(componentConfig.get(Config.TOPOLOGY_TRANSACTIONAL_ID))
+      .foreach(config.put(Config.TOPOLOGY_TRANSACTIONAL_ID, _))
+    Option(componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS))
+      .foreach(config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, _))
+    config
+  }
+
+  private def askAppMasterForDAG(appMaster: ActorRef): DAG = {
+    implicit val timeout = Constants.FUTURE_TIMEOUT
+    val dagFuture = (appMaster ? GetDAG).asInstanceOf[Future[DAG]]
+    Await.result(dagFuture, timeout.duration)
+  }
+
+  private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, stormConf: JMap[_, _])
+    : GeneralTopologyContext = {
+    val taskToComponent = getTaskToComponent(dag)
+    val componentToSortedTasks: JMap[String, JList[Integer]] =
+      getComponentToSortedTasks(taskToComponent)
+    val componentToStreamFields: JMap[String, JMap[String, Fields]] =
+      getComponentToStreamFields(topology)
+    new GeneralTopologyContext(
+      topology, stormConf, taskToComponent.asJava, componentToSortedTasks,
+      componentToStreamFields, null)
+  }
+
+  private def buildTopologyContext(
+      dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: Integer)
+    : TopologyContext = {
+    val taskToComponent = getTaskToComponent(dag)
+    val componentToSortedTasks: JMap[String, JList[Integer]] =
+      getComponentToSortedTasks(taskToComponent)
+    val componentToStreamFields: JMap[String, JMap[String, Fields]] =
+      getComponentToStreamFields(topology)
+    val codeDir = mkCodeDir
+    val pidDir = mkPidDir
+
+    new TopologyContext(topology, stormConf, taskToComponent.asJava, componentToSortedTasks,
+      componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, null, null,
+      new JHashMap[String, AnyRef], new JHashMap[Integer, JMap[Integer, JMap[String, IMetric]]],
+      new Atom(false))
+  }
+
+  private def getComponentToStreamFields(topology: StormTopology)
+    : JMap[String, JMap[String, Fields]] = {
+    val spouts = topology.get_spouts().asScala
+    val bolts = topology.get_bolts().asScala
+
+    val spoutFields = spouts.map {
+      case (id, component) => id -> getComponentToFields(component.get_common())
+    }
+
+    val boltFields = bolts.map {
+      case (id, component) => id -> getComponentToFields(component.get_common())
+    }
+
+    val systemFields = Map(SYSTEM_COMPONENT_ID ->
+      Map(SYSTEM_TICK_STREAM_ID -> new Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava)
+
+    (spoutFields ++ boltFields ++ systemFields).asJava
+  }
+
+  private def getComponentToFields(common: ComponentCommon): JMap[String, Fields] = {
+    val streams = common.get_streams.asScala
+    streams.map { case (sid, stream) =>
+      sid -> new Fields(stream.get_output_fields())
+    }.asJava
+  }
+
+  private def getComponentToSortedTasks(
+      taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = {
+    taskToComponent.groupBy(_._2).map { case (component, map) =>
+      val sortedTasks = map.keys.toList.sorted.asJava
+      component -> sortedTasks
+    }.asJava
+  }
+
+  private def getTaskToComponent(dag: DAG): Map[Integer, String] = {
+    val taskToComponent = dag.processors.flatMap { case (processorId, processorDescription) =>
+      val parallelism = processorDescription.parallelism
+      val component = processorDescription.taskConf.getString(STORM_COMPONENT).get
+      (0 until parallelism).map(index =>
+        gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component)
+    }
+    taskToComponent
+  }
+
+  // Workarounds to support storm ShellBolt
+  private def mkPidDir: String = {
+    val pidDir = FileUtils.getTempDirectoryPath + File.separator + "pid"
+    try {
+      FileUtils.forceMkdir(new File(pidDir))
+    } catch {
+      case ex: IOException =>
+        LOG.error(s"failed to create pid directory $pidDir")
+    }
+    pidDir
+  }
+
+  // a workaround to support storm ShellBolt
+  private def mkCodeDir: String = {
+    val jarPath = System.getProperty("java.class.path").split(":").last
+    val destDir = FileUtils.getTempDirectoryPath + File.separator + "storm"
+
+    try {
+      FileUtils.forceMkdir(new File(destDir))
+
+      val jar = new JarFile(jarPath)
+      val enumEntries = jar.entries().asScala
+      enumEntries.foreach { entry =>
+        val file = new File(destDir + File.separator + entry.getName)
+        if (!entry.isDirectory) {
+          file.getParentFile.mkdirs()
+
+          val is = jar.getInputStream(entry)
+          val fos = new FileOutputStream(file)
+          try {
+            IOUtils.copy(is, fos)
+          } catch {
+            case ex: IOException =>
+              LOG.error(s"failed to copy data from ${entry.getName} to ${file.getName}")
+          } finally {
+            fos.close()
+            is.close()
+          }
+        }
+      }
+    } catch {
+      case ex: IOException =>
+        LOG.error(s"could not extract $destDir from $jarPath")
+    }
+
+    destDir + File.separator + "resources"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
new file mode 100644
index 0000000..62bc25c
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.lang.{Iterable => JIterable}
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
+
+import akka.actor.ActorSystem
+import backtype.storm.Config
+import backtype.storm.generated._
+import backtype.storm.utils.{ThriftTopologyUtils, Utils}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.processor.StormProcessor
+import org.apache.gearpump.experiments.storm.producer.StormProducer
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormUtil
+import org.apache.gearpump.experiments.storm.util.StormUtil._
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+// TODO: Refactor this file, we should disable using of JavaConversions
+// scalastyle:off javaconversions
+import scala.collection.JavaConversions._
+// scalastyle:on javaconversions
+
+object GearpumpStormTopology {
+  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology])
+
+  def apply(
+      name: String,
+      topology: StormTopology,
+      appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = {
+    new GearpumpStormTopology(
+      name,
+      topology,
+      Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]],
+      parseJsonStringToMap(appConfigInJson)
+    )
+  }
+}
+
+/**
+ * this is a wrapper over Storm topology which
+ * 1. merges Storm and Gearpump configs
+ * 2. creates Gearpump processors
+ * 3. provides interface for Gearpump applications to use Storm topology
+ *
+ * an implicit ActorSystem is required to create Gearpump processors
+ * @param name topology name
+ * @param topology Storm topology
+ * @param sysConfig configs from "defaults.yaml" and custom config file
+ * @param appConfig config submitted from user application
+ */
+private[storm] class GearpumpStormTopology(
+    name: String,
+    topology: StormTopology,
+    sysConfig: JMap[AnyRef, AnyRef],
+    appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
+
+  private val spouts = topology.get_spouts()
+  private val bolts = topology.get_bolts()
+  private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts))
+  private val spoutProcessors = spouts.map { case (id, spout) =>
+    id -> spoutToProcessor(id, spout, stormConfig.toMap)
+  }.toMap
+  private val boltProcessors = bolts.map { case (id, bolt) =>
+    id -> boltToProcessor(id, bolt, stormConfig.toMap)
+  }.toMap
+  private val allProcessors = spoutProcessors ++ boltProcessors
+
+  /**
+   * @return merged Storm config with priority
+   *         defaults.yaml < custom file config < application config < component config
+   */
+  def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig
+
+  /**
+   * @return Storm components to Gearpump processors
+   */
+  def getProcessors: Map[String, Processor[Task]] = allProcessors
+
+  /**
+   * @param sourceId source component id
+   * @return target Storm components and Gearpump processors
+   */
+  def getTargets(sourceId: String): Map[String, Processor[Task]] = {
+    getTargets(sourceId, topology).map { case (targetId, _) =>
+      targetId -> boltProcessors(targetId)
+    }
+  }
+
+  /**
+   * merge configs from application, custom config file and component
+   */
+  private def mergeConfigs(
+      sysConfig: JMap[AnyRef, AnyRef],
+      appConfig: JMap[AnyRef, AnyRef],
+      componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = {
+    val allConfig = new JHashMap[AnyRef, AnyRef]
+    allConfig.putAll(sysConfig)
+    allConfig.putAll(appConfig)
+    allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap))
+    allConfig.put(Config.TOPOLOGY_NAME, name)
+    allConfig
+  }
+
+  /**
+   * creates Gearpump processor from Storm spout
+   * @param spoutId spout id
+   * @param spoutSpec spout spec
+   * @param stormConfig merged storm config
+   * @param system actor system
+   * @return a Processor[StormProducer]
+   */
+  private def spoutToProcessor(spoutId: String, spoutSpec: SpoutSpec,
+      stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = {
+    val componentCommon = spoutSpec.get_common()
+    val taskConf = UserConfig.empty
+      .withString(STORM_COMPONENT, spoutId)
+    val parallelism = getParallelism(stormConfig, componentCommon)
+    Processor[StormProducer](parallelism, spoutId, taskConf)
+  }
+
+  /**
+   * creates Gearpump processor from Storm bolt
+   * @param boltId bolt id
+   * @param boltSpec bolt spec
+   * @param stormConfig merged storm config
+   * @param system actor system
+   * @return a Processor[StormProcessor]
+   */
+  private def boltToProcessor(boltId: String, boltSpec: Bolt,
+      stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = {
+    val componentCommon = boltSpec.get_common()
+    val taskConf = UserConfig.empty
+      .withString(STORM_COMPONENT, boltId)
+      .withBoolean("state.checkpoint.enable", StormUtil.ackEnabled(stormConfig))
+    val parallelism = getParallelism(stormConfig, componentCommon)
+    Processor[StormProcessor](parallelism, boltId, taskConf)
+  }
+
+  /**
+   * @return target components and streams
+   */
+  private def getTargets(componentId: String, topology: StormTopology)
+    : Map[String, Map[String, Grouping]] = {
+    val componentIds = ThriftTopologyUtils.getComponentIds(topology)
+    componentIds.flatMap { otherComponentId =>
+      getInputs(otherComponentId, topology).toList.map(otherComponentId -> _)
+    }.foldLeft(Map.empty[String, Map[String, Grouping]]) {
+      (allTargets, componentAndInput) =>
+        val (otherComponentId, (globalStreamId, grouping)) = componentAndInput
+        val inputStreamId = globalStreamId.get_streamId()
+        val inputComponentId = globalStreamId.get_componentId
+        if (inputComponentId.equals(componentId)) {
+          val curr = allTargets.getOrElse(otherComponentId, Map.empty[String, Grouping])
+          allTargets + (otherComponentId -> (curr + (inputStreamId -> grouping)))
+        } else {
+          allTargets
+        }
+    }
+  }
+
+  /**
+   * @return input stream and grouping for a Storm component
+   */
+  private def getInputs(componentId: String, topology: StormTopology)
+    : JMap[GlobalStreamId, Grouping] = {
+    ThriftTopologyUtils.getComponentCommon(topology, componentId).get_inputs
+  }
+
+  /**
+   * get Storm component parallelism according to the following rule,
+   * 1. use "topology.tasks" if defined; otherwise use parallelism_hint
+   * 2. parallelism should not be larger than "topology.max.task.parallelism" if defined
+   * 3. component config overrides system config
+   * @param stormConfig System configs without merging "topology.tasks" and
+   *                    "topology.max.task.parallelism" of component
+   * @return number of task instances for a component
+   */
+  private def getParallelism(stormConfig: Map[AnyRef, AnyRef], component: ComponentCommon): Int = {
+    val parallelismHint: Int = if (component.is_set_parallelism_hint()) {
+      component.get_parallelism_hint()
+    } else {
+      1
+    }
+    val mergedConfig = new JHashMap[AnyRef, AnyRef]
+    val componentConfig = parseJsonStringToMap(component.get_json_conf)
+    mergedConfig.putAll(stormConfig)
+    mergedConfig.putAll(componentConfig)
+    val numTasks: Int = getInt(mergedConfig, Config.TOPOLOGY_TASKS).getOrElse(parallelismHint)
+    val parallelism: Int = getInt(mergedConfig, Config.TOPOLOGY_MAX_TASK_PARALLELISM)
+      .fold(numTasks)(p => math.min(p, numTasks))
+    parallelism
+  }
+
+  private def getComponentConfigs(spouts: JMap[String, SpoutSpec],
+      bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = {
+    spouts.map { case (id, spoutSpec) =>
+      parseJsonStringToMap(spoutSpec.get_common().get_json_conf())
+    } ++ bolts.map { case (id, boltSpec) =>
+      parseJsonStringToMap(boltSpec.get_common().get_json_conf())
+    }
+  }
+
+  /**
+   * merge component configs "topology.kryo.decorators" and "topology.kryo.register"
+   * @param componentConfigs list of component configs
+   * @param allConfig existing configs without merging component configs
+   * @return the two configs merged from all the component configs and existing configs
+   */
+  private def getMergedComponentConfig(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
+    val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+    mergedConfig.putAll(getMergedKryoDecorators(componentConfigs, allConfig))
+    mergedConfig.putAll(getMergedKryoRegister(componentConfigs, allConfig))
+    mergedConfig
+  }
+
+  /**
+   * @param componentConfigs list of component configs
+   * @param allConfig existing configs without merging component configs
+   * @return a merged config with a list of distinct kryo decorators from component and
+   *         existing configs
+   */
+  private def getMergedKryoDecorators(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
+    val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
+    val key = Config.TOPOLOGY_KRYO_DECORATORS
+    val configs = getConfigValues(componentConfigs, allConfig, key)
+    val distincts = configs.foldLeft(Set.empty[String]) {
+      case (accum, config: JIterable[_]) =>
+        accum ++ config.map {
+          case s: String => s
+          case illegal =>
+            throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal")
+        }
+      case (accum, null) =>
+        accum
+      case illegal =>
+        throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal")
+    }
+    if (distincts.nonEmpty) {
+      val decorators: JList[String] = new JArrayList(distincts.size)
+      decorators.addAll(distincts)
+      mergedConfig.put(key, decorators)
+    }
+    mergedConfig
+  }
+
+  /**
+   * @param componentConfigs list of component configs
+   * @param allConfig existing configs without merging component configs
+   * @return a merged config with component config overriding existing configs
+   */
+  private def getMergedKryoRegister(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
+    val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
+    val key = Config.TOPOLOGY_KRYO_REGISTER
+    val configs = getConfigValues(componentConfigs, allConfig, key)
+    val merged = configs.foldLeft(Map.empty[String, String]) {
+      case (accum, config: JIterable[_]) =>
+        accum ++ config.map {
+          case m: JMap[_, _] =>
+            m.map {
+              case (k: String, v: String) => k -> v
+              case illegal =>
+                throw new IllegalArgumentException(
+                  s"each element of $key must be a String or a Map of Strings; actually $illegal")
+            }
+          case s: String =>
+            Map(s -> null)
+          case illegal =>
+            throw new IllegalArgumentException(s"each element of $key must be a String or " +
+              s"a Map of Strings; actually $illegal")
+        }.reduce(_ ++ _)
+      case (accum, null) =>
+        accum
+      case (accum, illegal) =>
+        throw new IllegalArgumentException(
+          s"$key must be an Iterable containing only Strings or Maps of Strings; actually $illegal")
+    }
+    if (merged.nonEmpty) {
+      val registers: JMap[String, String] = new JHashMap[String, String](merged.size)
+      registers.putAll(merged)
+      mergedConfig.put(key, registers)
+    }
+    mergedConfig
+  }
+
+  /**
+   * @param componentConfigs list of raw component configs
+   * @param allConfig existing configs without merging component configs
+   * @param key config key
+   * @return a list of values for a config from both component configs and existing configs
+   */
+  private def getConfigValues(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+      allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = {
+    componentConfigs.map(config => config.get(key)) ++ allConfig.get(key).toList
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
new file mode 100644
index 0000000..eb61acb
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{List => JList}
+
+import backtype.storm.task.GeneralTopologyContext
+import backtype.storm.tuple.{Tuple, TupleImpl}
+
+import org.apache.gearpump.TimeStamp
+
+/**
+ * this carries Storm tuple values in the Gearpump world
+ * the targetPartitions field dictate which tasks a GearpumpTuple should be sent to
+ * see [[org.apache.gearpump.experiments.storm.partitioner.StormPartitioner]] for more info
+ */
+private[storm] class GearpumpTuple(
+    val values: JList[AnyRef],
+    val sourceTaskId: Integer,
+    val sourceStreamId: String,
+    @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable {
+  /**
+   * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component
+   * this is needed for each incoming message
+   * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization
+   * @param topologyContext topology context used for all tasks
+   * @return a Tuple
+   */
+  def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = {
+    TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp)
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[GearpumpTuple]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: GearpumpTuple =>
+      (that canEqual this) &&
+        values == that.values &&
+        sourceTaskId == that.sourceTaskId &&
+        sourceStreamId == that.sourceStreamId
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(values, sourceTaskId, sourceStreamId)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}
+
+case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef],
+    sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp)
+  extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null)
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
new file mode 100644
index 0000000..777acab
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
+import org.apache.gearpump.partitioner.Partitioner
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.Graph
+
+object GraphBuilder {
+
+  /**
+   * build a Gearpump DAG from a Storm topology
+   * @param topology a wrapper over Storm topology
+   * @return a DAG
+   */
+  def build(topology: GearpumpStormTopology): Graph[Processor[_ <: Task], _ <: Partitioner] = {
+    val processorGraph = Graph.empty[Processor[Task], Partitioner]
+
+    topology.getProcessors.foreach { case (sourceId, sourceProcessor) =>
+      topology.getTargets(sourceId).foreach { case (targetId, targetProcessor) =>
+        processorGraph.addEdge(sourceProcessor, new StormPartitioner(targetId), targetProcessor)
+      }
+    }
+
+    processorGraph
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala
new file mode 100644
index 0000000..1d04af6
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{List => JList}
+import scala.util.Random
+
+import backtype.storm.generated.GlobalStreamId
+import backtype.storm.grouping.CustomStreamGrouping
+import backtype.storm.task.TopologyContext
+import backtype.storm.tuple.Fields
+
+/**
+ * Grouper is identical to that in storm but return gearpump partitions for storm tuple values
+ */
+sealed trait Grouper {
+  /**
+   * @param taskId storm task id of source task
+   * @param values storm tuple values
+   * @return a list of gearpump partitions
+   */
+  def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int]
+}
+
+/**
+ * GlobalGrouper always returns partition 0
+ */
+class GlobalGrouper extends Grouper {
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = Array(0)
+}
+
+/**
+ * NoneGrouper randomly returns partition
+ *
+ * @param numTasks number of target tasks
+ */
+class NoneGrouper(numTasks: Int) extends Grouper {
+  private val random = new Random
+
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+    val partition = StormUtil.mod(random.nextInt, numTasks)
+    Array(partition)
+  }
+}
+
+/**
+ * ShuffleGrouper shuffles partitions and returns them sequentially, and then shuffles again
+ *
+ * @param numTasks number of target tasks
+ */
+class ShuffleGrouper(numTasks: Int) extends Grouper {
+  private val random = new Random
+  private var index = -1
+  private var partitions = List.empty[Int]
+
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+    index += 1
+    if (partitions.isEmpty) {
+      partitions = 0.until(numTasks).toList
+      partitions = random.shuffle(partitions)
+    } else if (index >= numTasks) {
+      index = 0
+      partitions = random.shuffle(partitions)
+    }
+    Array(partitions(index))
+  }
+}
+
+/**
+ * FieldsGrouper returns partition based on value of groupFields
+ *
+ * @param outFields declared output fields of source task
+ * @param groupFields grouping fields of target tasks
+ * @param numTasks number of target tasks
+ */
+class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) extends Grouper {
+
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+    val hash = outFields.select(groupFields, values).hashCode()
+    val partition = StormUtil.mod(hash, numTasks)
+    Array(partition)
+  }
+}
+
+/**
+ * AllGrouper returns all partitions
+ *
+ * @param numTasks number of target tasks
+ */
+class AllGrouper(numTasks: Int) extends Grouper {
+  val partitions = (0 until numTasks).toArray
+
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+    partitions
+  }
+}
+
+/**
+ * CustomGrouper allows users to specify grouping strategy
+ *
+ * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]]
+ */
+class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper {
+
+  def prepare(
+      topologyContext: TopologyContext, globalStreamId: GlobalStreamId, targetTasks: JList[Integer])
+    : Unit = {
+    grouping.prepare(topologyContext, globalStreamId, targetTasks)
+  }
+
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+    val tasks = grouping.chooseTasks(taskId, values)
+    val result = new Array[Int](tasks.size())
+
+    val iter = tasks.iterator()
+
+    var index = 0
+    while (iter.hasNext()) {
+      val value = iter.next()
+      result(index) = StormUtil.stormTaskIdToGearpump(value).index
+      index += 1
+    }
+    result
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala
new file mode 100644
index 0000000..f1c736c
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+object StormConstants {
+  val STORM_COMPONENT = "storm_component"
+  val STORM_TOPOLOGY = "storm_topology"
+  val STORM_CONFIG = "storm_config"
+  val SYSTEM_COMPONENT_ID = "__system"
+  val SYSTEM_COMPONENT_OUTPUT_FIELDS = "rate_secs"
+  val SYSTEM_TASK_ID: Integer = -1
+  val SYSTEM_TICK_STREAM_ID = "__tick"
+
+  val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds
+
+  val STORM_SERIALIZATION_FRAMEWORK = "gearpump.storm.serialization-framework"
+}