You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:35 UTC
[25/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
deleted file mode 100644
index 554210c..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import java.lang.{Boolean => JBoolean}
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-import akka.actor.ActorSystem
-import backtype.storm.Config
-import backtype.storm.generated._
-import org.apache.storm.shade.org.json.simple.JSONValue
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
-import io.gearpump.experiments.storm.topology._
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.streaming.task.{TaskContext, TaskId}
-import io.gearpump.util.Util
-
-object StormUtil {
-
- /**
- * Convert storm task id to gearpump [[io.gearpump.streaming.task.TaskId]]
- *
- * The high 16 bit of an Int is TaskId.processorId
- * The low 16 bit of an Int is TaskId.index
- */
- def stormTaskIdToGearpump(id: Integer): TaskId = {
- val index = id & 0xFFFF
- val processorId = id >> 16
- TaskId(processorId, index)
- }
-
- /**
- * convert gearpump [[TaskId]] to storm task id
- * TaskId.processorId is the high 16 bit of an Int
- * TaskId.index is the low 16 bit of an Int
- */
- def gearpumpTaskIdToStorm(taskId: TaskId): Integer = {
- val index = taskId.index
- val processorId = taskId.processorId
- (processorId << 16) + (index & 0xFFFF)
- }
-
- /**
- * @return a configured [[GearpumpStormComponent]]
- */
- def getGearpumpStormComponent(
- taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem)
- : GearpumpStormComponent = {
- val topology = conf.getValue[StormTopology](STORM_TOPOLOGY).get
- val stormConfig = conf.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get
- val componentId = conf.getString(STORM_COMPONENT).get
- val spouts = topology.get_spouts
- val bolts = topology.get_bolts
- if (spouts.containsKey(componentId)) {
- GearpumpSpout(topology, stormConfig, spouts.get(componentId), taskContext)
- } else if (bolts.containsKey(componentId)) {
- GearpumpBolt(topology, stormConfig, bolts.get(componentId), taskContext)
- } else {
- throw new Exception(s"storm component $componentId not found")
- }
- }
-
- /**
- * Parses config in json to map, returns empty map for invalid json string
- *
- * @param json config in json
- * @return config in map
- */
- def parseJsonStringToMap(json: String): JMap[AnyRef, AnyRef] = {
- Option(json).flatMap(json => JSONValue.parse(json) match {
- case m: JMap[_, _] => Option(m.asInstanceOf[JMap[AnyRef, AnyRef]])
- case _ => None
- }).getOrElse(new JHashMap[AnyRef, AnyRef])
- }
-
- /**
- * get Int value of the config name
- */
- def getInt(conf: JMap[_, _], name: String): Option[Int] = {
- Option(conf.get(name)).map {
- case number: Number => number.intValue
- case invalid => throw new IllegalArgumentException(
- s"$name must be Java Integer; actual: ${invalid.getClass}")
- }
- }
-
- /**
- * get Boolean value of the config name
- */
- def getBoolean(conf: JMap[_, _], name: AnyRef): Option[Boolean] = {
- Option(conf.get(name)).map {
- case b: JBoolean => b.booleanValue()
- case invalid => throw new IllegalArgumentException(
- s"$name must be a Java Boolean; acutal: ${invalid.getClass}")
- }
- }
-
- /**
- * clojure mod ported from Storm
- * see https://clojuredocs.org/clojure.core/mod
- */
- def mod(num: Int, div: Int): Int = {
- (num % div + div) % div
- }
-
- def ackEnabled(config: JMap[AnyRef, AnyRef]): Boolean = {
- if (config.containsKey(Config.TOPOLOGY_ACKER_EXECUTORS)) {
- getInt(config, Config.TOPOLOGY_ACKER_EXECUTORS).map(_ != 0).getOrElse(true)
- } else {
- false
- }
- }
-
- def getThriftPort(): Int = {
- Util.findFreePort().getOrElse(
- throw new Exception("unable to find free port for thrift server"))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala
new file mode 100644
index 0000000..51760ba
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm
+
+import org.apache.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+object StormRunner {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient)
+
+ private def usage(): Unit = {
+ val keys = commands.keys.toList.sorted
+ // scalastyle:off println
+ Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+ // scalastyle:on println
+ }
+
+ private def executeCommand(command: String, commandArgs: Array[String]): Unit = {
+ if (!commands.contains(command)) {
+ usage()
+ } else {
+ commands(command).main(commandArgs)
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ if (args.length == 0) {
+ usage()
+ } else {
+ val command = args(0)
+ val commandArgs = args.drop(1)
+ executeCommand(command, commandArgs)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
new file mode 100644
index 0000000..544a4eb
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.main
+
+import java.io.{File, FileOutputStream, FileWriter}
+import java.nio.ByteBuffer
+import java.nio.channels.{Channels, WritableByteChannel}
+import java.util.{HashMap => JHashMap, Map => JMap, UUID}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+import akka.actor.ActorSystem
+import com.typesafe.config.ConfigValueFactory
+import backtype.storm.Config
+import backtype.storm.generated._
+import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
+import backtype.storm.utils.Utils
+import org.apache.storm.shade.org.json.simple.JSONValue
+import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
+import org.apache.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback
+import org.apache.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil, TimeCacheMapWrapper}
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
+
+object GearpumpNimbus extends AkkaApp with ArgumentsParser {
+ private val THRIFT_PORT = StormUtil.getThriftPort()
+ private val OUTPUT = "output"
+ private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ OUTPUT -> CLIOption[String]("<output path for configuration file>",
+ required = false, defaultValue = Some("app.yaml"))
+ )
+
+ override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+ val parsed = parse(args)
+ val output = parsed.getString(OUTPUT)
+ val akkaConf = updateClientConfig(inputAkkaConf)
+ val system = ActorSystem("storm", akkaConf)
+
+ val clientContext = new ClientContext(akkaConf, system, null)
+ val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
+ val thriftConf: JMap[AnyRef, AnyRef] = Map(
+ Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),
+ Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]]
+ updateStormConfig(thriftConf, output)
+ stormConf.putAll(thriftConf)
+
+ import scala.concurrent.ExecutionContext.Implicits.global
+ Future {
+ val thriftServer = createServer(clientContext, stormConf)
+ thriftServer.serve()
+ }
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ private def createServer(
+ clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = {
+ val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext,
+ stormConf))
+ val connectionType = ThriftConnectionType.NIMBUS
+ new ThriftServer(stormConf, processor, connectionType)
+ }
+
+ private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: String): Unit = {
+ val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+ val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]]
+ updatedConfig.putAll(outputConfig)
+ updatedConfig.putAll(thriftConfig)
+ val yaml = new Yaml
+ val serialized = yaml.dumpAsMap(updatedConfig)
+ val writer = new FileWriter(new File(output))
+ try {
+ writer.write(serialized)
+ } catch {
+ case e: Exception => throw e
+ } finally {
+ writer.close()
+ }
+ }
+
+ import org.apache.gearpump.util.Constants._
+ private def updateClientConfig(config: Config): Config = {
+ val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
+ val appClassPath = s"$storm${File.pathSeparator}" +
+ config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
+ val executorClassPath = s"$storm${File.pathSeparator}" +
+ config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
+
+ val updated = config
+ .withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath))
+ .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH,
+ ConfigValueFactory.fromAnyRef(executorClassPath))
+
+ if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
+ val serializerConfig = ConfigValueFactory.fromAnyRef(
+ config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
+ updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
+ } else {
+ updated
+ }
+ }
+}
+
+class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef])
+ extends Nimbus.Iface {
+
+ import org.apache.gearpump.experiments.storm.main.GearpumpNimbus._
+
+ private var applications = Map.empty[String, Int]
+ private var topologies = Map.empty[String, TopologyData]
+ private val expireSeconds = StormUtil.getInt(stormConf,
+ Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
+ private val expiredCallback = new Callback[String, WritableByteChannel] {
+ override def expire(k: String, v: WritableByteChannel): Unit = {
+ v.close()
+ }
+ }
+ private val fileCacheMap = new TimeCacheMapWrapper[String, WritableByteChannel](expireSeconds,
+ expiredCallback)
+
+ override def submitTopology(
+ name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology)
+ : Unit = {
+ submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology,
+ new SubmitOptions(TopologyInitialStatus.ACTIVE))
+ }
+
+ override def submitTopologyWithOpts(
+ name: String, uploadedJarLocation: String,
+ jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = {
+ LOG.info(s"Submitted topology $name")
+ implicit val system = clientContext.system
+ val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
+ val stormConfig = gearpumpStormTopology.getStormConfig
+ val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1)
+ val processorGraph = GraphBuilder.build(gearpumpStormTopology)
+ val config = UserConfig.empty
+ .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
+ .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
+ val app = StreamApplication(name, processorGraph, config)
+ LOG.info(s"jar file uploaded to $uploadedJarLocation")
+ val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
+ applications += name -> appId
+ topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation)
+ LOG.info(s"Storm Application $appId submitted")
+ }
+
+ override def killTopologyWithOpts(name: String, options: KillOptions): Unit = {
+ if (applications.contains(name)) {
+ clientContext.shutdown(applications(name))
+ removeTopology(name)
+ LOG.info(s"Killed topology $name")
+ } else {
+ throw new RuntimeException(s"topology $name not found")
+ }
+ }
+
+ override def getNimbusConf: String = {
+ JSONValue.toJSONString(stormConf)
+ }
+
+ override def getTopology(name: String): StormTopology = {
+ updateApps()
+ topologies.getOrElse(name,
+ throw new RuntimeException(s"topology $name not found")).topology
+ }
+
+ override def getTopologyConf(name: String): String = {
+ updateApps()
+ JSONValue.toJSONString(topologies.getOrElse(name,
+ throw new RuntimeException(s"topology $name not found")).config)
+ }
+
+ override def getUserTopology(id: String): StormTopology = getTopology(id)
+
+ override def beginFileUpload(): String = {
+ val file = File.createTempFile(s"storm-jar-${UUID.randomUUID()}", ".jar")
+ val location = file.getAbsolutePath
+ val channel = Channels.newChannel(new FileOutputStream(location))
+ fileCacheMap.put(location, channel)
+ LOG.info(s"Uploading file from client to $location")
+ location
+ }
+
+ override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
+ if (!fileCacheMap.containsKey(location)) {
+ throw new RuntimeException(s"File for $location does not exist (or timed out)")
+ } else {
+ val channel = fileCacheMap.get(location)
+ channel.write(chunk)
+ fileCacheMap.put(location, channel)
+ }
+ }
+
+ override def finishFileUpload(location: String): Unit = {
+ if (!fileCacheMap.containsKey(location)) {
+ throw new RuntimeException(s"File for $location does not exist (or timed out)")
+ } else {
+ val channel = fileCacheMap.get(location)
+ channel.close()
+ fileCacheMap.remove(location)
+ }
+ }
+
+ override def getClusterInfo: ClusterSummary = {
+ updateApps()
+ val topologySummaryList = topologies.map { case (name, _) =>
+ new TopologySummary(name, name, 0, 0, 0, 0, "")
+ }.toSeq
+ new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava)
+ }
+
+ override def beginFileDownload(file: String): String = {
+ throw new UnsupportedOperationException
+ }
+
+ override def uploadNewCredentials(s: String, credentials: Credentials): Unit = {
+ throw new UnsupportedOperationException
+ }
+ override def activate(name: String): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def rebalance(name: String, options: RebalanceOptions): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def deactivate(name: String): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ override def getTopologyInfo(name: String): TopologyInfo = {
+ throw new UnsupportedOperationException
+ }
+
+ override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = {
+ throw new UnsupportedOperationException
+ }
+
+ override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions())
+
+ override def downloadChunk(name: String): ByteBuffer = {
+ throw new UnsupportedOperationException
+ }
+
+ private def updateApps(): Unit = {
+ clientContext.listApps.appMasters.foreach { app =>
+ val name = app.appName
+ if (applications.contains(name)) {
+ if (app.status != MasterToAppMaster.AppMasterActive) {
+ removeTopology(name)
+ }
+ }
+ }
+ }
+
+ private def removeTopology(name: String): Unit = {
+ applications -= name
+ val jar = topologies(name).jar
+ new File(jar).delete()
+ topologies -= name
+ }
+}
+
+case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], jar: String)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala
new file mode 100644
index 0000000..1cfd5a4
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.main
+
+import backtype.storm.Config
+import backtype.storm.utils.Utils
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{AkkaApp, LogUtil, Util}
+
+object GearpumpStormClient extends AkkaApp with ArgumentsParser {
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "jar" -> CLIOption[String]("<storm jar>", required = true),
+ "config" -> CLIOption[Int]("<storm config file>", required = true),
+ "verbose" -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)))
+
+ override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+
+ val verbose = config.getBoolean("verbose")
+ if (verbose) {
+ LogUtil.verboseLogToConsole()
+ }
+
+ val jar = config.getString("jar")
+ val stormConfig = config.getString("config")
+ val topology = config.remainArgs(0)
+ val stormArgs = config.remainArgs.drop(1)
+ val stormOptions = Array(
+ s"-Dstorm.options=${getThriftOptions(stormConfig)}",
+ s"-Dstorm.jar=$jar",
+ s"-Dstorm.conf.file=$stormConfig",
+ s"-D${PREFER_IPV4}=true"
+ )
+
+ val gearpumpHome = System.getProperty(GEARPUMP_HOME)
+ val classPath = Array(s"$gearpumpHome/lib/*", s"$gearpumpHome/lib/storm/*", jar)
+ val process = Util.startProcess(stormOptions, classPath, topology, stormArgs)
+
+ // Waits till the process exit
+ val exit = process.exitValue()
+
+ if (exit != 0) {
+ throw new Exception(s"failed to submit jar, exit code $exit, " +
+ s"error summary: ${process.logger.error}")
+ }
+ }
+
+ private def getThriftOptions(stormConfig: String): String = {
+ val config = Utils.findAndReadConfigFile(stormConfig, true)
+ val host = config.get(Config.NIMBUS_HOST)
+ val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT)
+ s"${Config.NIMBUS_HOST}=$host,${Config.NIMBUS_THRIFT_PORT}=$thriftPort"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
new file mode 100644
index 0000000..aaa0a99
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner}
+
+/**
+ * Partitioner bound to a target Storm component
+ *
+ * Partitioning is already done in
+ * [[org.apache.gearpump.experiments.storm.util.StormOutputCollector]] and
+ * kept in "targetPartitions" of [[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]]
+ * the partitioner just returns the partitions of the target
+ *
+ * In Gearpump, a message is sent from a task to all the subscribers.
+ * In Storm, however, a message is sent to one or more of the subscribers.
+ * Hence, we have to do the partitioning in
+ * [[org.apache.gearpump.experiments.storm.util.StormOutputCollector]] till the Storm way is
+ * supported in Gearpump
+ *
+ * @param target target storm component id
+ */
+private[storm] class StormPartitioner(target: String) extends MulticastPartitioner {
+
+ override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int)
+ : Array[Int] = {
+ val stormTuple = msg.msg.asInstanceOf[GearpumpTuple]
+ stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID))
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
new file mode 100644
index 0000000..a70ce48
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.processor
+
+import java.util.{Collection => JCollection, List => JList}
+
+import backtype.storm.task.IOutputCollector
+import backtype.storm.tuple.Tuple
+import org.apache.gearpump.experiments.storm.topology.TimedTuple
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+import org.apache.gearpump.streaming.task.ReportCheckpointClock
+
+/**
+ * this is used by Storm bolt to emit messages
+ */
+private[storm] class StormBoltOutputCollector(collector: StormOutputCollector,
+ ackEnabled: Boolean = false) extends IOutputCollector {
+ private var reportTime = 0L
+ private var maxAckTime = 0L
+
+ override def emit(
+ streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): JList[Integer] = {
+ collector.emit(streamId, tuple)
+ }
+
+ override def emitDirect(
+ taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): Unit = {
+ collector.emitDirect(taskId, streamId, tuple)
+ }
+
+ override def fail(tuple: Tuple): Unit = {
+ // application failure, throw exception such that the tuple can be replayed
+ // Note: do not print the tuple which will trigger NPE since its messageId is null
+ throw new Exception("Storm Bolt.execute failed")
+ }
+
+ override def ack(tuple: Tuple): Unit = {
+ if (ackEnabled) {
+ tuple match {
+ case timedTuple: TimedTuple =>
+ maxAckTime = Math.max(maxAckTime, timedTuple.timestamp)
+ val taskContext = collector.taskContext
+ val upstreamMinClock = taskContext.upstreamMinClock
+ if (reportTime <= upstreamMinClock && upstreamMinClock <= maxAckTime) {
+ reportTime = upstreamMinClock / CHECKPOINT_INTERVAL_MILLIS * CHECKPOINT_INTERVAL_MILLIS
+ taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, reportTime)
+ reportTime += CHECKPOINT_INTERVAL_MILLIS
+ }
+ case _ =>
+ // ignore other tuples
+ }
+ }
+ }
+
+ override def reportError(throwable: Throwable): Unit = {
+ throw throwable
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
new file mode 100644
index 0000000..1d3048e
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.processor
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.Duration
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
+import org.apache.gearpump.experiments.storm.util._
+import org.apache.gearpump.streaming.task._
+
+object StormProcessor {
+ private[storm] val TICK = Message("tick")
+}
+
+/**
+ * this is runtime container for Storm bolt
+ */
+private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
+ taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+ import org.apache.gearpump.experiments.storm.processor.StormProcessor._
+
+ def this(taskContext: TaskContext, conf: UserConfig) = {
+ this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system)
+ .asInstanceOf[GearpumpBolt], taskContext, conf)
+ }
+
+ private val freqOpt = gearpumpBolt.getTickFrequency
+
+ override def onStart(startTime: StartTime): Unit = {
+ gearpumpBolt.start(startTime)
+ freqOpt.foreach(scheduleTick)
+ }
+
+ override def onNext(message: Message): Unit = {
+ message match {
+ case TICK =>
+ freqOpt.foreach { freq =>
+ gearpumpBolt.tick(freq)
+ scheduleTick(freq)
+ }
+ case _ =>
+ gearpumpBolt.next(message)
+ }
+ }
+
+ private def scheduleTick(freq: Int): Unit = {
+ taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)) {
+ self ! TICK
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
new file mode 100644
index 0000000..5d4a6a2
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.producer
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
+import org.apache.gearpump.experiments.storm.util._
+import org.apache.gearpump.streaming.task._
+
+import scala.concurrent.duration.Duration
+
+object StormProducer {
+ private[storm] val TIMEOUT = Message("timeout")
+}
+
+/**
+ * this is runtime container for Storm spout
+ */
+private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
+ taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+ import org.apache.gearpump.experiments.storm.producer.StormProducer._
+
+ def this(taskContext: TaskContext, conf: UserConfig) = {
+ this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system)
+ .asInstanceOf[GearpumpSpout], taskContext, conf)
+ }
+
+ private val timeoutMillis = gearpumpSpout.getMessageTimeout
+
+ override def onStart(startTime: StartTime): Unit = {
+ gearpumpSpout.start(startTime)
+ if (gearpumpSpout.ackEnabled) {
+ getCheckpointClock
+ }
+ timeoutMillis.foreach(scheduleTimeout)
+ self ! Message("start")
+ }
+
+ override def onNext(msg: Message): Unit = {
+ msg match {
+ case TIMEOUT =>
+ timeoutMillis.foreach { timeout =>
+ gearpumpSpout.timeout(timeout)
+ scheduleTimeout(timeout)
+ }
+ case _ =>
+ gearpumpSpout.next(msg)
+ }
+ self ! Message("continue")
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case CheckpointClock(optClock) =>
+ optClock.foreach { clock =>
+ gearpumpSpout.checkpoint(clock)
+ }
+ getCheckpointClock()
+ }
+
+ def getCheckpointClock(): Unit = {
+ taskContext.scheduleOnce(Duration(StormConstants.CHECKPOINT_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS))(taskContext.appMaster ! GetCheckpointClock)
+ }
+
+ private def scheduleTimeout(timeout: Long): Unit = {
+ taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)) {
+ self ! TIMEOUT
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
new file mode 100644
index 0000000..5794b1d
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.producer
+
+import java.util.{List => JList}
+
+import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+
+case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp)
+
+/**
+ * this is used by Storm Spout to emit messages
+ */
+private[storm] class StormSpoutOutputCollector(
+ collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean)
+ extends ISpoutOutputCollector {
+
+ private var checkpointClock = 0L
+ private var pendingMessage: Option[PendingMessage] = None
+ private var nextPendingMessage: Option[PendingMessage] = None
+
+ override def emit(streamId: String, values: JList[AnyRef], messageId: Object): JList[Integer] = {
+ val curTime = System.currentTimeMillis()
+ collector.setTimestamp(curTime)
+ val outTasks = collector.emit(streamId, values)
+ setPendingOrAck(messageId, curTime, curTime)
+ outTasks
+ }
+
+ override def reportError(throwable: Throwable): Unit = {
+ throw throwable
+ }
+
+ override def emitDirect(taskId: Int, streamId: String, values: JList[AnyRef], messageId: Object)
+ : Unit = {
+ val curTime = System.currentTimeMillis()
+ collector.setTimestamp(curTime)
+ collector.emitDirect(taskId, streamId, values)
+ setPendingOrAck(messageId, curTime, curTime)
+ }
+
+ def ackPendingMessage(checkpointClock: TimeStamp): Unit = {
+ this.checkpointClock = checkpointClock
+ nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) =>
+ if (messageTime <= this.checkpointClock) {
+ pendingMessage.foreach { case PendingMessage(id, _, _) =>
+ spout.ack(id)
+ reset()
+ }
+ }
+ }
+ }
+
+ def failPendingMessage(timeoutMillis: Long): Unit = {
+ pendingMessage.foreach { case PendingMessage(id, _, startTime) =>
+ if (System.currentTimeMillis() - startTime >= timeoutMillis) {
+ spout.fail(id)
+ reset()
+ }
+ }
+ }
+
+ private def reset(): Unit = {
+ pendingMessage = nextPendingMessage
+ nextPendingMessage = None
+ }
+
+ private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp)
+ : Unit = {
+ if (ackEnabled) {
+ val newPendingMessage = PendingMessage(messageId, messageTime, startTime)
+ pendingMessage match {
+ case Some(msg) =>
+ if (nextPendingMessage.isEmpty && msg.messageTime <= this.checkpointClock) {
+ nextPendingMessage = Some(newPendingMessage)
+ } else {
+ spout.ack(messageId)
+ }
+ case None =>
+ pendingMessage = Some(newPendingMessage)
+ }
+ } else {
+ spout.ack(messageId)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
new file mode 100644
index 0000000..d0f2949
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.io.{File, FileOutputStream, IOException}
+import java.util
+import java.util.jar.JarFile
+import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+
+import akka.actor.ActorRef
+import akka.pattern.ask
+import backtype.storm.Config
+import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology}
+import backtype.storm.metric.api.IMetric
+import backtype.storm.spout.{ISpout, SpoutOutputCollector}
+import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
+import backtype.storm.tuple.{Fields, Tuple, TupleImpl}
+import backtype.storm.utils.Utils
+import clojure.lang.Atom
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector
+import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormUtil._
+import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
+import org.apache.gearpump.streaming.DAG
+import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime}
+import org.apache.gearpump.util.{Constants, LogUtil}
+import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{Await, Future}
+
+/**
+ * subclass wraps Storm Spout and Bolt, and their lifecycles
+ * hides the complexity from Gearpump applications
+ */
+trait GearpumpStormComponent {
+ /**
+ * invoked at Task.onStart
+ * @param startTime task start time
+ */
+ def start(startTime: StartTime): Unit
+
+ /**
+ * invoked at Task.onNext
+ * @param message incoming message
+ */
+ def next(message: Message): Unit
+
+ /**
+ * invoked at Task.onStop
+ */
+ def stop(): Unit = {}
+}
+
+object GearpumpStormComponent {
+ private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormComponent])
+
+ object GearpumpSpout {
+ def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
+ spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = {
+ val componentCommon = spoutSpec.get_common()
+ val scalaMap = config.asScala.toMap // Convert to scala immutable map
+ val normalizedConfig = normalizeConfig(scalaMap, componentCommon)
+ val getTopologyContext = (dag: DAG, taskId: TaskId) => {
+ val stormTaskId = gearpumpTaskIdToStorm(taskId)
+ buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
+ }
+ val spout = Utils.getSetComponentObject(spoutSpec.get_spout_object()).asInstanceOf[ISpout]
+ val ackEnabled = StormUtil.ackEnabled(config)
+ if (ackEnabled) {
+ val className = spout.getClass.getName
+ if (!isSequentiallyReplayableSpout(className)) {
+ LOG.warn(s"at least once is not supported for $className")
+ }
+ }
+ val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => {
+ new StormSpoutOutputCollector(
+ StormOutputCollector(taskContext, topologyContext), spout, ackEnabled)
+ }
+ GearpumpSpout(
+ normalizedConfig,
+ spout,
+ askAppMasterForDAG,
+ getTopologyContext,
+ getOutputCollector,
+ ackEnabled,
+ taskContext)
+ }
+
+ private def isSequentiallyReplayableSpout(className: String): Boolean = {
+ className.equals("storm.kafka.KafkaSpout")
+ }
+ }
+
+ case class GearpumpSpout(
+ config: JMap[AnyRef, AnyRef],
+ spout: ISpout,
+ getDAG: ActorRef => DAG,
+ getTopologyContext: (DAG, TaskId) => TopologyContext,
+ getOutputCollector: (TaskContext, TopologyContext) => StormSpoutOutputCollector,
+ ackEnabled: Boolean,
+ taskContext: TaskContext)
+ extends GearpumpStormComponent {
+
+ private var collector: StormSpoutOutputCollector = null
+
+ override def start(startTime: StartTime): Unit = {
+ val dag = getDAG(taskContext.appMaster)
+ val topologyContext = getTopologyContext(dag, taskContext.taskId)
+ collector = getOutputCollector(taskContext, topologyContext)
+ spout.open(config, topologyContext, new SpoutOutputCollector(collector))
+ }
+
+ override def next(message: Message): Unit = {
+ spout.nextTuple()
+ }
+
+ /**
+ * @return timeout in milliseconds if enabled
+ */
+ def getMessageTimeout: Option[Long] = {
+ StormUtil.getBoolean(config, Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).flatMap {
+ timeoutEnabled =>
+ if (timeoutEnabled) {
+ StormUtil.getInt(config, Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).map(_ * 1000L)
+ } else {
+ None
+ }
+ }
+ }
+
+ def checkpoint(clock: TimeStamp): Unit = {
+ collector.ackPendingMessage(clock)
+ }
+
+ def timeout(timeoutMillis: Long): Unit = {
+ collector.failPendingMessage(timeoutMillis)
+ }
+ }
+
+ object GearpumpBolt {
+ def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
+ boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = {
+ val configAsScalaMap = config.asScala.toMap // Convert to scala immutable map
+ val normalizedConfig = normalizeConfig(configAsScalaMap, boltSpec.get_common())
+ val getTopologyContext = (dag: DAG, taskId: TaskId) => {
+ val stormTaskId = gearpumpTaskIdToStorm(taskId)
+ buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
+ }
+ val getGeneralTopologyContext = (dag: DAG) => {
+ buildGeneralTopologyContext(dag, topology, normalizedConfig)
+ }
+ val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => {
+ StormOutputCollector(taskContext, topologyContext)
+ }
+ val getTickTuple = (topologyContext: GeneralTopologyContext, freq: Int) => {
+
+ val values = new util.ArrayList[Object] // To be compatible with Java interface
+ values.add(freq.asInstanceOf[java.lang.Integer])
+ new TupleImpl(topologyContext, values, SYSTEM_TASK_ID, SYSTEM_TICK_STREAM_ID, null)
+ }
+ GearpumpBolt(
+ normalizedConfig,
+ Utils.getSetComponentObject(boltSpec.get_bolt_object()).asInstanceOf[IBolt],
+ askAppMasterForDAG,
+ getTopologyContext,
+ getGeneralTopologyContext,
+ getOutputCollector,
+ getTickTuple,
+ taskContext)
+ }
+ }
+
+ case class GearpumpBolt(
+ config: JMap[AnyRef, AnyRef],
+ bolt: IBolt,
+ getDAG: ActorRef => DAG,
+ getTopologyContext: (DAG, TaskId) => TopologyContext,
+ getGeneralTopologyContext: DAG => GeneralTopologyContext,
+ getOutputCollector: (TaskContext, TopologyContext) => StormOutputCollector,
+ getTickTuple: (GeneralTopologyContext, Int) => Tuple,
+ taskContext: TaskContext)
+ extends GearpumpStormComponent {
+
+ private var collector: StormOutputCollector = null
+ private var topologyContext: TopologyContext = null
+ private var generalTopologyContext: GeneralTopologyContext = null
+ private var tickTuple: Tuple = null
+
+ override def start(startTime: StartTime): Unit = {
+ val dag = getDAG(taskContext.appMaster)
+ topologyContext = getTopologyContext(dag, taskContext.taskId)
+ generalTopologyContext = getGeneralTopologyContext(dag)
+ collector = getOutputCollector(taskContext, topologyContext)
+ val delegate = new StormBoltOutputCollector(collector, StormUtil.ackEnabled(config))
+ bolt.prepare(config, topologyContext, new OutputCollector(delegate))
+ }
+
+ override def next(message: Message): Unit = {
+ val timestamp = message.timestamp
+ collector.setTimestamp(timestamp)
+ bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
+ timestamp))
+ }
+
+ def getTickFrequency: Option[Int] = {
+ StormUtil.getInt(config, Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)
+ }
+
+ /**
+ * invoked at TICK message when "topology.tick.tuple.freq.secs" is configured
+ * @param freq tick frequency
+ */
+ def tick(freq: Int): Unit = {
+ if (null == tickTuple) {
+ tickTuple = getTickTuple(generalTopologyContext, freq)
+ }
+ bolt.execute(tickTuple)
+ }
+ }
+
+ /**
+ * normalize general config with per component configs
+ * "topology.transactional.id" and "topology.tick.tuple.freq.secs"
+ * @param stormConfig general config for all components
+ * @param componentCommon common component parts
+ */
+ private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef],
+ componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = {
+ val config: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+ config.putAll(stormConfig.asJava)
+ val componentConfig = parseJsonStringToMap(componentCommon.get_json_conf())
+ Option(componentConfig.get(Config.TOPOLOGY_TRANSACTIONAL_ID))
+ .foreach(config.put(Config.TOPOLOGY_TRANSACTIONAL_ID, _))
+ Option(componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS))
+ .foreach(config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, _))
+ config
+ }
+
+ private def askAppMasterForDAG(appMaster: ActorRef): DAG = {
+ implicit val timeout = Constants.FUTURE_TIMEOUT
+ val dagFuture = (appMaster ? GetDAG).asInstanceOf[Future[DAG]]
+ Await.result(dagFuture, timeout.duration)
+ }
+
+ private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, stormConf: JMap[_, _])
+ : GeneralTopologyContext = {
+ val taskToComponent = getTaskToComponent(dag)
+ val componentToSortedTasks: JMap[String, JList[Integer]] =
+ getComponentToSortedTasks(taskToComponent)
+ val componentToStreamFields: JMap[String, JMap[String, Fields]] =
+ getComponentToStreamFields(topology)
+ new GeneralTopologyContext(
+ topology, stormConf, taskToComponent.asJava, componentToSortedTasks,
+ componentToStreamFields, null)
+ }
+
+ private def buildTopologyContext(
+ dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: Integer)
+ : TopologyContext = {
+ val taskToComponent = getTaskToComponent(dag)
+ val componentToSortedTasks: JMap[String, JList[Integer]] =
+ getComponentToSortedTasks(taskToComponent)
+ val componentToStreamFields: JMap[String, JMap[String, Fields]] =
+ getComponentToStreamFields(topology)
+ val codeDir = mkCodeDir
+ val pidDir = mkPidDir
+
+ new TopologyContext(topology, stormConf, taskToComponent.asJava, componentToSortedTasks,
+ componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, null, null,
+ new JHashMap[String, AnyRef], new JHashMap[Integer, JMap[Integer, JMap[String, IMetric]]],
+ new Atom(false))
+ }
+
+ private def getComponentToStreamFields(topology: StormTopology)
+ : JMap[String, JMap[String, Fields]] = {
+ val spouts = topology.get_spouts().asScala
+ val bolts = topology.get_bolts().asScala
+
+ val spoutFields = spouts.map {
+ case (id, component) => id -> getComponentToFields(component.get_common())
+ }
+
+ val boltFields = bolts.map {
+ case (id, component) => id -> getComponentToFields(component.get_common())
+ }
+
+ val systemFields = Map(SYSTEM_COMPONENT_ID ->
+ Map(SYSTEM_TICK_STREAM_ID -> new Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava)
+
+ (spoutFields ++ boltFields ++ systemFields).asJava
+ }
+
+ private def getComponentToFields(common: ComponentCommon): JMap[String, Fields] = {
+ val streams = common.get_streams.asScala
+ streams.map { case (sid, stream) =>
+ sid -> new Fields(stream.get_output_fields())
+ }.asJava
+ }
+
+ private def getComponentToSortedTasks(
+ taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = {
+ taskToComponent.groupBy(_._2).map { case (component, map) =>
+ val sortedTasks = map.keys.toList.sorted.asJava
+ component -> sortedTasks
+ }.asJava
+ }
+
+ private def getTaskToComponent(dag: DAG): Map[Integer, String] = {
+ val taskToComponent = dag.processors.flatMap { case (processorId, processorDescription) =>
+ val parallelism = processorDescription.parallelism
+ val component = processorDescription.taskConf.getString(STORM_COMPONENT).get
+ (0 until parallelism).map(index =>
+ gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component)
+ }
+ taskToComponent
+ }
+
+ // Workarounds to support storm ShellBolt
+ private def mkPidDir: String = {
+ val pidDir = FileUtils.getTempDirectoryPath + File.separator + "pid"
+ try {
+ FileUtils.forceMkdir(new File(pidDir))
+ } catch {
+ case ex: IOException =>
+ LOG.error(s"failed to create pid directory $pidDir")
+ }
+ pidDir
+ }
+
+ // a workaround to support storm ShellBolt
+ private def mkCodeDir: String = {
+ val jarPath = System.getProperty("java.class.path").split(":").last
+ val destDir = FileUtils.getTempDirectoryPath + File.separator + "storm"
+
+ try {
+ FileUtils.forceMkdir(new File(destDir))
+
+ val jar = new JarFile(jarPath)
+ val enumEntries = jar.entries().asScala
+ enumEntries.foreach { entry =>
+ val file = new File(destDir + File.separator + entry.getName)
+ if (!entry.isDirectory) {
+ file.getParentFile.mkdirs()
+
+ val is = jar.getInputStream(entry)
+ val fos = new FileOutputStream(file)
+ try {
+ IOUtils.copy(is, fos)
+ } catch {
+ case ex: IOException =>
+ LOG.error(s"failed to copy data from ${entry.getName} to ${file.getName}")
+ } finally {
+ fos.close()
+ is.close()
+ }
+ }
+ }
+ } catch {
+ case ex: IOException =>
+ LOG.error(s"could not extract $destDir from $jarPath")
+ }
+
+ destDir + File.separator + "resources"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
new file mode 100644
index 0000000..62bc25c
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.lang.{Iterable => JIterable}
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
+
+import akka.actor.ActorSystem
+import backtype.storm.Config
+import backtype.storm.generated._
+import backtype.storm.utils.{ThriftTopologyUtils, Utils}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.processor.StormProcessor
+import org.apache.gearpump.experiments.storm.producer.StormProducer
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormUtil
+import org.apache.gearpump.experiments.storm.util.StormUtil._
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+// TODO: Refactor this file, we should disable using of JavaConversions
+// scalastyle:off javaconversions
+import scala.collection.JavaConversions._
+// scalastyle:on javaconversions
+
+object GearpumpStormTopology {
+ private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology])
+
+ def apply(
+ name: String,
+ topology: StormTopology,
+ appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = {
+ new GearpumpStormTopology(
+ name,
+ topology,
+ Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]],
+ parseJsonStringToMap(appConfigInJson)
+ )
+ }
+}
+
+/**
+ * this is a wrapper over Storm topology which
+ * 1. merges Storm and Gearpump configs
+ * 2. creates Gearpump processors
+ * 3. provides interface for Gearpump applications to use Storm topology
+ *
+ * an implicit ActorSystem is required to create Gearpump processors
+ * @param name topology name
+ * @param topology Storm topology
+ * @param sysConfig configs from "defaults.yaml" and custom config file
+ * @param appConfig config submitted from user application
+ */
+private[storm] class GearpumpStormTopology(
+ name: String,
+ topology: StormTopology,
+ sysConfig: JMap[AnyRef, AnyRef],
+ appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
+
+ private val spouts = topology.get_spouts()
+ private val bolts = topology.get_bolts()
+ private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts))
+ private val spoutProcessors = spouts.map { case (id, spout) =>
+ id -> spoutToProcessor(id, spout, stormConfig.toMap)
+ }.toMap
+ private val boltProcessors = bolts.map { case (id, bolt) =>
+ id -> boltToProcessor(id, bolt, stormConfig.toMap)
+ }.toMap
+ private val allProcessors = spoutProcessors ++ boltProcessors
+
+ /**
+ * @return merged Storm config with priority
+ * defaults.yaml < custom file config < application config < component config
+ */
+ def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig
+
+ /**
+ * @return Storm components to Gearpump processors
+ */
+ def getProcessors: Map[String, Processor[Task]] = allProcessors
+
+ /**
+ * @param sourceId source component id
+ * @return target Storm components and Gearpump processors
+ */
+ def getTargets(sourceId: String): Map[String, Processor[Task]] = {
+ getTargets(sourceId, topology).map { case (targetId, _) =>
+ targetId -> boltProcessors(targetId)
+ }
+ }
+
+ /**
+ * merge configs from application, custom config file and component
+ */
+ private def mergeConfigs(
+ sysConfig: JMap[AnyRef, AnyRef],
+ appConfig: JMap[AnyRef, AnyRef],
+ componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = {
+ val allConfig = new JHashMap[AnyRef, AnyRef]
+ allConfig.putAll(sysConfig)
+ allConfig.putAll(appConfig)
+ allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap))
+ allConfig.put(Config.TOPOLOGY_NAME, name)
+ allConfig
+ }
+
+ /**
+ * creates Gearpump processor from Storm spout
+ * @param spoutId spout id
+ * @param spoutSpec spout spec
+ * @param stormConfig merged storm config
+ * @param system actor system
+ * @return a Processor[StormProducer]
+ */
+ private def spoutToProcessor(spoutId: String, spoutSpec: SpoutSpec,
+ stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = {
+ val componentCommon = spoutSpec.get_common()
+ val taskConf = UserConfig.empty
+ .withString(STORM_COMPONENT, spoutId)
+ val parallelism = getParallelism(stormConfig, componentCommon)
+ Processor[StormProducer](parallelism, spoutId, taskConf)
+ }
+
+ /**
+ * creates Gearpump processor from Storm bolt
+ * @param boltId bolt id
+ * @param boltSpec bolt spec
+ * @param stormConfig merged storm config
+ * @param system actor system
+ * @return a Processor[StormProcessor]
+ */
+ private def boltToProcessor(boltId: String, boltSpec: Bolt,
+ stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = {
+ val componentCommon = boltSpec.get_common()
+ val taskConf = UserConfig.empty
+ .withString(STORM_COMPONENT, boltId)
+ .withBoolean("state.checkpoint.enable", StormUtil.ackEnabled(stormConfig))
+ val parallelism = getParallelism(stormConfig, componentCommon)
+ Processor[StormProcessor](parallelism, boltId, taskConf)
+ }
+
+ /**
+ * @return target components and streams
+ */
+ private def getTargets(componentId: String, topology: StormTopology)
+ : Map[String, Map[String, Grouping]] = {
+ val componentIds = ThriftTopologyUtils.getComponentIds(topology)
+ componentIds.flatMap { otherComponentId =>
+ getInputs(otherComponentId, topology).toList.map(otherComponentId -> _)
+ }.foldLeft(Map.empty[String, Map[String, Grouping]]) {
+ (allTargets, componentAndInput) =>
+ val (otherComponentId, (globalStreamId, grouping)) = componentAndInput
+ val inputStreamId = globalStreamId.get_streamId()
+ val inputComponentId = globalStreamId.get_componentId
+ if (inputComponentId.equals(componentId)) {
+ val curr = allTargets.getOrElse(otherComponentId, Map.empty[String, Grouping])
+ allTargets + (otherComponentId -> (curr + (inputStreamId -> grouping)))
+ } else {
+ allTargets
+ }
+ }
+ }
+
+ /**
+ * @return input stream and grouping for a Storm component
+ */
+ private def getInputs(componentId: String, topology: StormTopology)
+ : JMap[GlobalStreamId, Grouping] = {
+ ThriftTopologyUtils.getComponentCommon(topology, componentId).get_inputs
+ }
+
+ /**
+ * get Storm component parallelism according to the following rule,
+ * 1. use "topology.tasks" if defined; otherwise use parallelism_hint
+ * 2. parallelism should not be larger than "topology.max.task.parallelism" if defined
+ * 3. component config overrides system config
+ * @param stormConfig System configs without merging "topology.tasks" and
+ * "topology.max.task.parallelism" of component
+ * @return number of task instances for a component
+ */
+ private def getParallelism(stormConfig: Map[AnyRef, AnyRef], component: ComponentCommon): Int = {
+ val parallelismHint: Int = if (component.is_set_parallelism_hint()) {
+ component.get_parallelism_hint()
+ } else {
+ 1
+ }
+ val mergedConfig = new JHashMap[AnyRef, AnyRef]
+ val componentConfig = parseJsonStringToMap(component.get_json_conf)
+ mergedConfig.putAll(stormConfig)
+ mergedConfig.putAll(componentConfig)
+ val numTasks: Int = getInt(mergedConfig, Config.TOPOLOGY_TASKS).getOrElse(parallelismHint)
+ val parallelism: Int = getInt(mergedConfig, Config.TOPOLOGY_MAX_TASK_PARALLELISM)
+ .fold(numTasks)(p => math.min(p, numTasks))
+ parallelism
+ }
+
+ private def getComponentConfigs(spouts: JMap[String, SpoutSpec],
+ bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = {
+ spouts.map { case (id, spoutSpec) =>
+ parseJsonStringToMap(spoutSpec.get_common().get_json_conf())
+ } ++ bolts.map { case (id, boltSpec) =>
+ parseJsonStringToMap(boltSpec.get_common().get_json_conf())
+ }
+ }
+
+ /**
+ * merge component configs "topology.kryo.decorators" and "topology.kryo.register"
+ * @param componentConfigs list of component configs
+ * @param allConfig existing configs without merging component configs
+ * @return the two configs merged from all the component configs and existing configs
+ */
+ private def getMergedComponentConfig(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+ allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
+ val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+ mergedConfig.putAll(getMergedKryoDecorators(componentConfigs, allConfig))
+ mergedConfig.putAll(getMergedKryoRegister(componentConfigs, allConfig))
+ mergedConfig
+ }
+
+ /**
+ * @param componentConfigs list of component configs
+ * @param allConfig existing configs without merging component configs
+ * @return a merged config with a list of distinct kryo decorators from component and
+ * existing configs
+ */
+ private def getMergedKryoDecorators(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+ allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
+ val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
+ val key = Config.TOPOLOGY_KRYO_DECORATORS
+ val configs = getConfigValues(componentConfigs, allConfig, key)
+ val distincts = configs.foldLeft(Set.empty[String]) {
+ case (accum, config: JIterable[_]) =>
+ accum ++ config.map {
+ case s: String => s
+ case illegal =>
+ throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal")
+ }
+ case (accum, null) =>
+ accum
+ case illegal =>
+ throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal")
+ }
+ if (distincts.nonEmpty) {
+ val decorators: JList[String] = new JArrayList(distincts.size)
+ decorators.addAll(distincts)
+ mergedConfig.put(key, decorators)
+ }
+ mergedConfig
+ }
+
+ /**
+ * @param componentConfigs list of component configs
+ * @param allConfig existing configs without merging component configs
+ * @return a merged config with component config overriding existing configs
+ */
+ private def getMergedKryoRegister(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+ allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
+ val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
+ val key = Config.TOPOLOGY_KRYO_REGISTER
+ val configs = getConfigValues(componentConfigs, allConfig, key)
+ val merged = configs.foldLeft(Map.empty[String, String]) {
+ case (accum, config: JIterable[_]) =>
+ accum ++ config.map {
+ case m: JMap[_, _] =>
+ m.map {
+ case (k: String, v: String) => k -> v
+ case illegal =>
+ throw new IllegalArgumentException(
+ s"each element of $key must be a String or a Map of Strings; actually $illegal")
+ }
+ case s: String =>
+ Map(s -> null)
+ case illegal =>
+ throw new IllegalArgumentException(s"each element of $key must be a String or " +
+ s"a Map of Strings; actually $illegal")
+ }.reduce(_ ++ _)
+ case (accum, null) =>
+ accum
+ case (accum, illegal) =>
+ throw new IllegalArgumentException(
+ s"$key must be an Iterable containing only Strings or Maps of Strings; actually $illegal")
+ }
+ if (merged.nonEmpty) {
+ val registers: JMap[String, String] = new JHashMap[String, String](merged.size)
+ registers.putAll(merged)
+ mergedConfig.put(key, registers)
+ }
+ mergedConfig
+ }
+
+ /**
+ * @param componentConfigs list of raw component configs
+ * @param allConfig existing configs without merging component configs
+ * @param key config key
+ * @return a list of values for a config from both component configs and existing configs
+ */
+ private def getConfigValues(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
+ allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = {
+ componentConfigs.map(config => config.get(key)) ++ allConfig.get(key).toList
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
new file mode 100644
index 0000000..eb61acb
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{List => JList}
+
+import backtype.storm.task.GeneralTopologyContext
+import backtype.storm.tuple.{Tuple, TupleImpl}
+
+import org.apache.gearpump.TimeStamp
+
+/**
+ * this carries Storm tuple values in the Gearpump world
+ * the targetPartitions field dictate which tasks a GearpumpTuple should be sent to
+ * see [[org.apache.gearpump.experiments.storm.partitioner.StormPartitioner]] for more info
+ */
+private[storm] class GearpumpTuple(
+ val values: JList[AnyRef],
+ val sourceTaskId: Integer,
+ val sourceStreamId: String,
+ @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable {
+ /**
+ * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component
+ * this is needed for each incoming message
+ * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization
+ * @param topologyContext topology context used for all tasks
+ * @return a Tuple
+ */
+ def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = {
+ TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp)
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[GearpumpTuple]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: GearpumpTuple =>
+ (that canEqual this) &&
+ values == that.values &&
+ sourceTaskId == that.sourceTaskId &&
+ sourceStreamId == that.sourceStreamId
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(values, sourceTaskId, sourceStreamId)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
+
+case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef],
+ sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp)
+ extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null)
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
new file mode 100644
index 0000000..777acab
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
+import org.apache.gearpump.partitioner.Partitioner
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.Graph
+
+object GraphBuilder {
+
+ /**
+ * build a Gearpump DAG from a Storm topology
+ * @param topology a wrapper over Storm topology
+ * @return a DAG
+ */
+ def build(topology: GearpumpStormTopology): Graph[Processor[_ <: Task], _ <: Partitioner] = {
+ val processorGraph = Graph.empty[Processor[Task], Partitioner]
+
+ topology.getProcessors.foreach { case (sourceId, sourceProcessor) =>
+ topology.getTargets(sourceId).foreach { case (targetId, targetProcessor) =>
+ processorGraph.addEdge(sourceProcessor, new StormPartitioner(targetId), targetProcessor)
+ }
+ }
+
+ processorGraph
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala
new file mode 100644
index 0000000..1d04af6
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{List => JList}
+import scala.util.Random
+
+import backtype.storm.generated.GlobalStreamId
+import backtype.storm.grouping.CustomStreamGrouping
+import backtype.storm.task.TopologyContext
+import backtype.storm.tuple.Fields
+
+/**
+ * Grouper is identical to that in storm but return gearpump partitions for storm tuple values
+ */
+sealed trait Grouper {
+ /**
+ * @param taskId storm task id of source task
+ * @param values storm tuple values
+ * @return a list of gearpump partitions
+ */
+ def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int]
+}
+
+/**
+ * GlobalGrouper always returns partition 0
+ */
+class GlobalGrouper extends Grouper {
+ override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = Array(0)
+}
+
+/**
+ * NoneGrouper randomly returns partition
+ *
+ * @param numTasks number of target tasks
+ */
+class NoneGrouper(numTasks: Int) extends Grouper {
+ private val random = new Random
+
+ override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+ val partition = StormUtil.mod(random.nextInt, numTasks)
+ Array(partition)
+ }
+}
+
+/**
+ * ShuffleGrouper shuffles partitions and returns them sequentially, and then shuffles again
+ *
+ * @param numTasks number of target tasks
+ */
+class ShuffleGrouper(numTasks: Int) extends Grouper {
+ private val random = new Random
+ private var index = -1
+ private var partitions = List.empty[Int]
+
+ override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+ index += 1
+ if (partitions.isEmpty) {
+ partitions = 0.until(numTasks).toList
+ partitions = random.shuffle(partitions)
+ } else if (index >= numTasks) {
+ index = 0
+ partitions = random.shuffle(partitions)
+ }
+ Array(partitions(index))
+ }
+}
+
+/**
+ * FieldsGrouper returns partition based on value of groupFields
+ *
+ * @param outFields declared output fields of source task
+ * @param groupFields grouping fields of target tasks
+ * @param numTasks number of target tasks
+ */
+class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) extends Grouper {
+
+ override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+ val hash = outFields.select(groupFields, values).hashCode()
+ val partition = StormUtil.mod(hash, numTasks)
+ Array(partition)
+ }
+}
+
+/**
+ * AllGrouper returns all partitions
+ *
+ * @param numTasks number of target tasks
+ */
+class AllGrouper(numTasks: Int) extends Grouper {
+ val partitions = (0 until numTasks).toArray
+
+ override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+ partitions
+ }
+}
+
+/**
+ * CustomGrouper allows users to specify grouping strategy
+ *
+ * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]]
+ */
+class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper {
+
+ def prepare(
+ topologyContext: TopologyContext, globalStreamId: GlobalStreamId, targetTasks: JList[Integer])
+ : Unit = {
+ grouping.prepare(topologyContext, globalStreamId, targetTasks)
+ }
+
+ override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+ val tasks = grouping.chooseTasks(taskId, values)
+ val result = new Array[Int](tasks.size())
+
+ val iter = tasks.iterator()
+
+ var index = 0
+ while (iter.hasNext()) {
+ val value = iter.next()
+ result(index) = StormUtil.stormTaskIdToGearpump(value).index
+ index += 1
+ }
+ result
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala
new file mode 100644
index 0000000..f1c736c
--- /dev/null
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+object StormConstants {
+ val STORM_COMPONENT = "storm_component"
+ val STORM_TOPOLOGY = "storm_topology"
+ val STORM_CONFIG = "storm_config"
+ val SYSTEM_COMPONENT_ID = "__system"
+ val SYSTEM_COMPONENT_OUTPUT_FIELDS = "rate_secs"
+ val SYSTEM_TASK_ID: Integer = -1
+ val SYSTEM_TICK_STREAM_ID = "__tick"
+
+ val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds
+
+ val STORM_SERIALIZATION_FRAMEWORK = "gearpump.storm.serialization-framework"
+}