You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:33 UTC

[23/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
new file mode 100644
index 0000000..49afe05
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.producer
+
+import java.util.{List => JList}
+
+import backtype.storm.spout.ISpout
+import backtype.storm.utils.Utils
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import scala.collection.JavaConverters._
+
+class StormSpoutOutputCollectorSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("StormSpoutOutputCollector should call StormOutputCollector") {
+    val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000))
+    val valuesGen = Gen.listOf[AnyRef](valGen)
+
+    forAll(valuesGen) { (values: List[AnyRef]) =>
+      val collector = mock[StormOutputCollector]
+      val spout = mock[ISpout]
+      val streamId = Utils.DEFAULT_STREAM_ID
+      val spoutCollector = new StormSpoutOutputCollector(collector, spout, false)
+      spoutCollector.emit(streamId, values.asJava, null)
+      verify(collector).emit(streamId, values.asJava)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
new file mode 100644
index 0000000..bdea50c
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{Map => JMap}
+
+import akka.actor.ActorRef
+import backtype.storm.spout.{ISpout, SpoutOutputCollector}
+import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
+import backtype.storm.tuple.Tuple
+import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import org.apache.gearpump.streaming.{DAG, MockUtil}
+import org.apache.gearpump.{Message, TimeStamp}
+import org.mockito.Matchers.{anyObject, eq => mockitoEq}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class GearpumpStormComponentSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("GearpumpSpout lifecycle") {
+    val config = mock[JMap[AnyRef, AnyRef]]
+    val spout = mock[ISpout]
+    val taskContext = MockUtil.mockTaskContext
+    val appMaster = mock[ActorRef]
+    when(taskContext.appMaster).thenReturn(appMaster)
+    val getDAG = mock[ActorRef => DAG]
+    val dag = mock[DAG]
+    when(getDAG(appMaster)).thenReturn(dag)
+    val getTopologyContext = mock[(DAG, TaskId) => TopologyContext]
+    val topologyContext = mock[TopologyContext]
+    when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext)
+    val getOutputCollector = mock[(TaskContext, TopologyContext) => StormSpoutOutputCollector]
+    val outputCollector = mock[StormSpoutOutputCollector]
+    when(getOutputCollector(taskContext, topologyContext)).thenReturn(outputCollector)
+
+    val gearpumpSpout = GearpumpSpout(config, spout, getDAG, getTopologyContext,
+      getOutputCollector, ackEnabled = false, taskContext)
+
+    // Start
+    val startTime = mock[StartTime]
+    gearpumpSpout.start(startTime)
+
+    verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
+      anyObject[SpoutOutputCollector])
+
+    // Next
+    val message = mock[Message]
+    gearpumpSpout.next(message)
+
+    verify(spout).nextTuple()
+  }
+
+  property("GearpumpBolt lifecycle") {
+    val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+    val freqGen = Gen.chooseNum[Int](1, 100)
+    forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) =>
+      val config = mock[JMap[AnyRef, AnyRef]]
+      val bolt = mock[IBolt]
+      val taskContext = MockUtil.mockTaskContext
+      val appMaster = mock[ActorRef]
+      when(taskContext.appMaster).thenReturn(appMaster)
+      val getDAG = mock[ActorRef => DAG]
+      val dag = mock[DAG]
+      when(getDAG(appMaster)).thenReturn(dag)
+      val getTopologyContext = mock[(DAG, TaskId) => TopologyContext]
+      val topologyContext = mock[TopologyContext]
+      when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext)
+      val getGeneralTopologyContext = mock[DAG => GeneralTopologyContext]
+      val generalTopologyContext = mock[GeneralTopologyContext]
+      when(getGeneralTopologyContext(dag)).thenReturn(generalTopologyContext)
+      val getOutputCollector = mock[(TaskContext, TopologyContext) => StormOutputCollector]
+      val stormOutputCollector = mock[StormOutputCollector]
+      when(getOutputCollector(taskContext, topologyContext)).thenReturn(stormOutputCollector)
+      val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple]
+      val tickTuple = mock[Tuple]
+      when(getTickTuple(mockitoEq(generalTopologyContext), anyObject[Int]())).thenReturn(tickTuple)
+      val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext,
+        getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
+
+      // Start
+      val startTime = mock[StartTime]
+      gearpumpBolt.start(startTime)
+
+      verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
+        anyObject[OutputCollector])
+
+      // Next
+      val gearpumpTuple = mock[GearpumpTuple]
+      val tuple = mock[Tuple]
+      when(gearpumpTuple.toTuple(generalTopologyContext, timestamp)).thenReturn(tuple)
+      val message = Message(gearpumpTuple, timestamp)
+      gearpumpBolt.next(message)
+
+      verify(stormOutputCollector).setTimestamp(timestamp)
+      verify(bolt).execute(tuple)
+
+      // Tick
+      gearpumpBolt.tick(freq)
+      verify(bolt).execute(tickTuple)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
new file mode 100644
index 0000000..ef383ad
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.Config
+import org.apache.gearpump.experiments.storm.processor.StormProcessor
+import org.apache.gearpump.experiments.storm.producer.StormProducer
+import org.apache.gearpump.experiments.storm.util.TopologyUtil
+import org.apache.gearpump.streaming.MockUtil
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
+import scala.collection.JavaConverters._
+
+class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar {
+  import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._
+
+  "GearpumpStormTopology" should {
+    "merge configs with defined priority" in {
+      val stormTopology = TopologyUtil.getTestTopology
+      val name = "name"
+      val sysVal = "sys"
+      val sysConfig = newJavaConfig(name, sysVal)
+      val appVal = "app"
+      val appConfig = newJavaConfig(name, appVal)
+
+      implicit val system = MockUtil.system
+      val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig,
+        newEmptyConfig)
+      topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1"
+      topology1.getStormConfig should not contain name
+
+      val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig,
+        newEmptyConfig)
+      topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2"
+      topology2.getStormConfig.get(name) shouldBe sysVal
+
+      val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig)
+      topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3"
+      topology3.getStormConfig.get(name) shouldBe appVal
+    }
+
+    "create Gearpump processors from Storm topology" in {
+      val stormTopology = TopologyUtil.getTestTopology
+      implicit val system = MockUtil.system
+      val gearpumpStormTopology =
+        GearpumpStormTopology("app", stormTopology, null)
+      val processors = gearpumpStormTopology.getProcessors
+      stormTopology.get_spouts().asScala.foreach { case (spoutId, _) =>
+        val processor = processors(spoutId)
+        processor.taskClass shouldBe classOf[StormProducer]
+        processor.description shouldBe spoutId
+      }
+      stormTopology.get_bolts().asScala.foreach { case (boltId, _) =>
+        val processor = processors(boltId)
+        processor.taskClass shouldBe classOf[StormProcessor]
+        processor.description shouldBe boltId
+      }
+    }
+
+    "get target processors from source id" in {
+      val stormTopology = TopologyUtil.getTestTopology
+      implicit val system = MockUtil.system
+      val sysConfig = new JHashMap[AnyRef, AnyRef]
+      val gearpumpStormTopology =
+        GearpumpStormTopology("app", stormTopology, null)
+      val targets0 = gearpumpStormTopology.getTargets("1")
+      targets0 should contain key "3"
+      targets0 should contain key "4"
+      val targets1 = gearpumpStormTopology.getTargets("2")
+      targets1 should contain key "3"
+    }
+  }
+}
+
+object GearpumpStormTopologySpec {
+  def newEmptyConfig: JMap[AnyRef, AnyRef] = {
+    new JHashMap[AnyRef, AnyRef]
+  }
+
+  def newJavaConfig(key: AnyRef, value: AnyRef): JMap[AnyRef, AnyRef] = {
+    val config = new JHashMap[AnyRef, AnyRef]
+    config.put(key, value)
+    config
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
new file mode 100644
index 0000000..f12e54f
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{List => JList}
+
+import backtype.storm.task.GeneralTopologyContext
+import backtype.storm.tuple.Fields
+import org.apache.gearpump.TimeStamp
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import scala.collection.JavaConverters._
+
+class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("GearpumpTuple should create Storm Tuple") {
+    val tupleGen = for {
+      values <- Gen.listOf[String](Gen.alphaStr).map(_.distinct.asJava.asInstanceOf[JList[AnyRef]])
+      sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue)
+      sourceStreamId <- Gen.alphaStr
+    } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null)
+
+    forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
+      (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) =>
+        val topologyContext = mock[GeneralTopologyContext]
+        val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
+        when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)
+        when(topologyContext.getComponentOutputFields(
+          componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields)
+
+        val tuple = gearpumpTuple.toTuple(topologyContext, timestamp)
+
+        tuple shouldBe a[TimedTuple]
+        val timedTuple = tuple.asInstanceOf[TimedTuple]
+        timedTuple.getValues shouldBe gearpumpTuple.values
+        timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId
+        timedTuple.getSourceComponent shouldBe componentId
+        timedTuple.getSourceStreamId shouldBe gearpumpTuple.sourceStreamId
+        timedTuple.getMessageId shouldBe null
+        timedTuple.getFields shouldBe fields
+        timedTuple.timestamp shouldBe timestamp
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
new file mode 100644
index 0000000..9cf5009
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+
+class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
+
+  "GraphBuilder" should {
+    "build Graph from Storm topology" in {
+      val topology = mock[GearpumpStormTopology]
+
+      val sourceId = "source"
+      val sourceProcessor = mock[Processor[Task]]
+      val targetId = "target"
+      val targetProcessor = mock[Processor[Task]]
+
+      when(topology.getProcessors).thenReturn(
+        Map(sourceId -> sourceProcessor, targetId -> targetProcessor))
+      when(topology.getTargets(sourceId)).thenReturn(Map(targetId -> targetProcessor))
+      when(topology.getTargets(targetId)).thenReturn(Map.empty[String, Processor[Task]])
+
+      val graph = GraphBuilder.build(topology)
+
+      graph.edges.size shouldBe 1
+      val (from, edge, to) = graph.edges.head
+      from shouldBe sourceProcessor
+      edge shouldBe a[StormPartitioner]
+      to shouldBe targetProcessor
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala
new file mode 100644
index 0000000..c1cdb3b
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters._
+
+import backtype.storm.generated.GlobalStreamId
+import backtype.storm.grouping.CustomStreamGrouping
+import backtype.storm.task.TopologyContext
+import backtype.storm.tuple.Fields
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.experiments.storm.util.GrouperSpec.Value
+
+class GrouperSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  val taskIdGen = Gen.chooseNum[Int](0, 1000)
+  val valuesGen = Gen.listOf[String](Gen.alphaStr)
+    .map(_.asJava.asInstanceOf[JList[AnyRef]])
+  val numTasksGen = Gen.chooseNum[Int](1, 1000)
+
+  property("GlobalGrouper should always return partition 0") {
+    forAll(taskIdGen, valuesGen) { (taskId: Int, values: JList[AnyRef]) =>
+      val grouper = new GlobalGrouper
+      grouper.getPartitions(taskId, values) shouldBe List(0)
+    }
+  }
+
+  property("NoneGrouper should returns partition in the range [0, numTasks)") {
+    forAll(taskIdGen, valuesGen, numTasksGen) {
+      (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
+        val grouper = new NoneGrouper(numTasks)
+        val partitions = grouper.getPartitions(taskId, values)
+        partitions.size shouldBe 1
+        partitions.head should (be >= 0 and be < numTasks)
+    }
+  }
+
+  property("ShuffleGrouper should return partition in the range [0, numTasks)") {
+    forAll(taskIdGen, valuesGen, numTasksGen) {
+      (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
+        val grouper = new ShuffleGrouper(numTasks)
+        val partitions = grouper.getPartitions(taskId, values)
+        partitions.size shouldBe 1
+        partitions.head should (be >= 0 and be < numTasks)
+    }
+  }
+
+  property("FieldsGrouper should return partition according to fields") {
+    forAll(taskIdGen, numTasksGen) {
+      (taskId: Int, numTasks: Int) =>
+        val values = 0.until(numTasks).map(i => new Value(i))
+        val fields = values.map(_.toString)
+        val outFields = new Fields(fields: _*)
+        values.flatMap { v =>
+          val groupFields = new Fields(v.toString)
+          val grouper = new FieldsGrouper(outFields, groupFields, numTasks)
+          grouper.getPartitions(taskId,
+            values.toList.asJava.asInstanceOf[JList[AnyRef]])
+        }.distinct.size shouldBe numTasks
+    }
+  }
+
+  property("AllGrouper should return all partitions") {
+    forAll(taskIdGen, numTasksGen, valuesGen) {
+      (taskId: Int, numTasks: Int, values: JList[AnyRef]) =>
+        val grouper = new AllGrouper(numTasks)
+        val partitions = grouper.getPartitions(taskId, values)
+        partitions.distinct.size shouldBe numTasks
+        partitions.min shouldBe 0
+        partitions.max shouldBe (numTasks - 1)
+    }
+  }
+
+  property("CustomGrouper should return partitions specified by user") {
+    val grouping = mock[CustomStreamGrouping]
+    val grouper = new CustomGrouper(grouping)
+    val topologyContext = mock[TopologyContext]
+    val globalStreamId = mock[GlobalStreamId]
+    val sourceTasks = mock[JList[Integer]]
+
+    grouper.prepare(topologyContext, globalStreamId, sourceTasks)
+
+    verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks)
+
+    forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: JList[AnyRef], taskNum: Int) =>
+      0.until(taskNum).foreach { i =>
+        when(grouping.chooseTasks(taskId, values)).thenReturn(List(new Integer(i)).asJava)
+        grouper.getPartitions(taskId, values) shouldBe List(i)
+      }
+    }
+  }
+}
+
+object GrouperSpec {
+  class Value(val i: Int) extends AnyRef {
+
+    override def toString: String = s"$i"
+
+    override def hashCode(): Int = i
+
+    override def equals(other: Any): Boolean = {
+      if (other.isInstanceOf[Value]) {
+        this.i == other.asInstanceOf[Value].i
+      } else {
+        false
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
new file mode 100644
index 0000000..e0e9e61
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+
+import backtype.storm.generated.Grouping
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump._
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.streaming.MockUtil
+
+class StormOutputCollectorSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  val stormTaskId = 0
+  val streamIdGen = Gen.alphaStr
+  val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
+  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+
+  property("StormOutputCollector emits tuple values into a stream") {
+    forAll(timestampGen, streamIdGen, valuesGen) {
+      (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
+        val targets = mock[JMap[String, JMap[String, Grouping]]]
+        val taskToComponent = mock[JMap[Integer, String]]
+        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+          (Map[String, Array[Int]], JList[Integer])]
+        val targetPartitions = mock[Map[String, Array[Int]]]
+        val targetStormTaskIds = mock[JList[Integer]]
+        when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+          targetStormTaskIds))
+        val taskContext = MockUtil.mockTaskContext
+        val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+          targets, getTargetPartitionsFn, taskContext, LatestTime)
+
+        when(targets.containsKey(streamId)).thenReturn(false)
+        stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
+        verify(taskContext, times(0)).output(anyObject[Message])
+
+        when(targets.containsKey(streamId)).thenReturn(true)
+        stormOutputCollector.setTimestamp(timestamp)
+        stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
+        verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
+          case Message(tuple: GearpumpTuple, t) =>
+            val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
+            tuple == expected && t == timestamp
+        }))
+    }
+  }
+
+  property("StormOutputCollector emit direct to a task") {
+    val idGen = Gen.chooseNum[Int](0, 1000)
+    val targetGen = Gen.alphaStr
+    forAll(idGen, targetGen, timestampGen, streamIdGen, valuesGen) {
+      (id: Int, target: String, timestamp: Long, streamId: String, values: JList[AnyRef]) =>
+        val targets = mock[JMap[String, JMap[String, Grouping]]]
+        val taskToComponent = mock[JMap[Integer, String]]
+        when(taskToComponent.get(id)).thenReturn(target)
+        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+          (Map[String, Array[Int]], JList[Integer])]
+        val targetPartitions = mock[Map[String, Array[Int]]]
+        val targetStormTaskIds = mock[JList[Integer]]
+        when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+          targetStormTaskIds))
+        val taskContext = MockUtil.mockTaskContext
+        val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+          targets, getTargetPartitionsFn, taskContext, LatestTime)
+
+        when(targets.containsKey(streamId)).thenReturn(false)
+        verify(taskContext, times(0)).output(anyObject[Message])
+
+        when(targets.containsKey(streamId)).thenReturn(true)
+        stormOutputCollector.setTimestamp(timestamp)
+        stormOutputCollector.emitDirect(id, streamId, values)
+        val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
+        verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
+          case Message(tuple: GearpumpTuple, t) => {
+            val expected = new GearpumpTuple(values, stormTaskId, streamId,
+            Map(target -> partitions))
+
+            val result = tuple == expected && t == timestamp
+            result
+          }
+        }))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
new file mode 100644
index 0000000..e787c3d
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+
+import akka.actor.ExtendedActorSystem
+import backtype.storm.utils.Utils
+import com.esotericsoftware.kryo.Kryo
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.streaming.MockUtil
+
+class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("StormSerializerPool should create and manage StormSerializer") {
+    val taskContext = MockUtil.mockTaskContext
+    val serializerPool = new StormSerializationFramework
+    val system = taskContext.system.asInstanceOf[ExtendedActorSystem]
+    implicit val actorSystem = system
+    val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, AnyRef]]
+    val config = UserConfig.empty.withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, stormConfig)
+    serializerPool.init(system, config)
+    serializerPool.get shouldBe a[StormSerializer]
+  }
+
+  property("StormSerializer should serialize and deserialize GearpumpTuple") {
+    val tupleGen = for {
+      values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
+      sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue)
+      sourceStreamId <- Gen.alphaStr
+    } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null)
+
+    val kryo = new Kryo
+    forAll(tupleGen) { (tuple: GearpumpTuple) =>
+      val serializer = new StormSerializer(kryo)
+      serializer.deserialize(serializer.serialize(tuple)) shouldBe tuple
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala
new file mode 100644
index 0000000..36d84cb
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.collection.JavaConverters._
+
+import backtype.storm.Config
+import backtype.storm.generated.StormTopology
+import org.apache.storm.shade.org.json.simple.JSONValue
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormUtil._
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.TaskId
+
+class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("convert Storm task ids to gearpump TaskIds and back") {
+    val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
+    forAll(idGen) { (stormTaskId: Int) =>
+      gearpumpTaskIdToStorm(stormTaskIdToGearpump(stormTaskId)) shouldBe stormTaskId
+    }
+
+    val processorIdGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16)
+    val indexGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16)
+    forAll(processorIdGen, indexGen) { (processorId: Int, index: Int) =>
+      val taskId = TaskId(processorId, index)
+      stormTaskIdToGearpump(gearpumpTaskIdToStorm(taskId)) shouldBe taskId
+    }
+  }
+
+  property("get GearpumpStormComponent from user config") {
+    val taskContext = MockUtil.mockTaskContext
+    val topology = TopologyUtil.getTestTopology
+    implicit val actorSystem = taskContext.system
+    val userConfig = UserConfig.empty
+      .withValue[StormTopology](STORM_TOPOLOGY, topology)
+      .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef])
+    topology.get_spouts.asScala.foreach { case (spoutId, _) =>
+      val config = userConfig.withString(STORM_COMPONENT, spoutId)
+      val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
+      component shouldBe a[GearpumpSpout]
+    }
+    topology.get_bolts.asScala.foreach { case (boltId, _) =>
+      val config = userConfig.withString(STORM_COMPONENT, boltId)
+      val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
+      component shouldBe a[GearpumpBolt]
+    }
+  }
+
+  property("parse json to map") {
+    val mapGen = Gen.listOf[String](Gen.alphaStr)
+      .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]])
+
+    forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) =>
+      parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map
+    }
+
+    val invalidJsonGen: Gen[String] = Gen.oneOf(null, "", "1")
+    forAll(invalidJsonGen) { (invalidJson: String) =>
+      val map = parseJsonStringToMap(invalidJson)
+      map shouldBe empty
+      map shouldBe a[JMap[_, _]]
+    }
+  }
+
+  property("get int from config") {
+    val name = "int"
+    val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+    getInt(conf, name) shouldBe None
+    conf.put(name, null)
+    getInt(conf, name) shouldBe None
+
+    forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) { (int: Int) =>
+      conf.put(name, new Integer(int))
+      getInt(conf, name) shouldBe Some(int)
+    }
+
+    forAll(Gen.chooseNum[Long](Int.MinValue, Int.MaxValue)) { (long: Long) =>
+      conf.put(name, new JLong(long))
+      getInt(conf, name) shouldBe Some(long)
+    }
+
+    forAll(Gen.alphaStr) { (s: String) =>
+      conf.put(name, s)
+      an[IllegalArgumentException] should be thrownBy getInt(conf, name)
+    }
+  }
+
+  property("get boolean from config") {
+    val name = "boolean"
+    val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+    getBoolean(conf, name) shouldBe None
+    conf.put(name, null)
+    getBoolean(conf, name) shouldBe None
+
+    forAll(Gen.oneOf(true, false)) { (boolean: Boolean) =>
+      conf.put(name, new JBoolean(boolean))
+      getBoolean(conf, name) shouldBe Some(boolean)
+    }
+
+    forAll(Gen.alphaStr) { (s: String) =>
+      conf.put(name, s)
+      an[IllegalArgumentException] should be thrownBy getBoolean(conf, name)
+    }
+  }
+
+  property("mod should be correct") {
+    mod(10, 5) shouldBe 0
+    mod(10, 6) shouldBe 4
+    mod(10, -3) shouldBe -2
+    mod(-2, 5) shouldBe 3
+    mod(-1, -2) shouldBe -1
+  }
+
+  property("get whether ack enabled") {
+    val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+    ackEnabled(conf) shouldBe false
+    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(0))
+    ackEnabled(conf) shouldBe false
+    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, null)
+    ackEnabled(conf) shouldBe true
+    forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) {
+      (ackers: Int) =>
+        conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(ackers))
+        if (ackers == 0) {
+          ackEnabled(conf) shouldBe false
+        } else {
+          ackEnabled(conf) shouldBe true
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala
new file mode 100644
index 0000000..886013c
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import backtype.storm.generated.StormTopology
+import backtype.storm.testing.{TestGlobalCount, TestWordCounter, TestWordSpout}
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.tuple.Fields
+import backtype.storm.utils.Utils
+
+object TopologyUtil {
+  val DEFAULT_STREAM_ID = Utils.DEFAULT_STREAM_ID
+  val DEFAULT_COMPONENT_ID = "component"
+
+  def getTestTopology: StormTopology = {
+    val topologyBuilder = new TopologyBuilder
+    topologyBuilder.setSpout("1", new TestWordSpout(true), 5)
+    topologyBuilder.setSpout("2", new TestWordSpout(true), 3)
+    topologyBuilder.setBolt("3", new TestWordCounter(), 3)
+      .fieldsGrouping("1", new Fields("word"))
+      .fieldsGrouping("2", new Fields("word"))
+    topologyBuilder.setBolt("4", new TestGlobalCount()).globalGrouping("1")
+    topologyBuilder.createTopology()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
deleted file mode 100644
index 6618b48..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn
-
-object Constants {
-  val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name"
-  val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command"
-  val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory"
-  val APPMASTER_VCORES = "gearpump.yarn.applicationmaster.vcores"
-  val APPMASTER_QUEUE = "gearpump.yarn.applicationmaster.queue"
-
-  val PACKAGE_PATH = "gearpump.yarn.client.package-path"
-  val CONFIG_PATH = "gearpump.yarn.client.config-path"
-
-  val MASTER_COMMAND = "gearpump.yarn.master.command"
-  val MASTER_MEMORY = "gearpump.yarn.master.memory"
-  val MASTER_VCORES = "gearpump.yarn.master.vcores"
-
-  val WORKER_COMMAND = "gearpump.yarn.worker.command"
-  val WORKER_CONTAINERS = "gearpump.yarn.worker.containers"
-  val WORKER_MEMORY = "gearpump.yarn.worker.memory"
-  val WORKER_VCORES = "gearpump.yarn.worker.vcores"
-
-  val SERVICES_ENABLED = "gearpump.yarn.services.enabled"
-
-  val LOCAL_DIRS = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.LOCAL_DIRS.$$()
-  val CONTAINER_ID = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.$$()
-  val LOG_DIR_EXPANSION_VAR = org.apache.hadoop.yarn.api.ApplicationConstants.LOG_DIR_EXPANSION_VAR
-  val NODEMANAGER_HOST = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.$$()
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
deleted file mode 100644
index af871ab..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.main.{Master, Worker}
-import io.gearpump.experiments.yarn.Constants._
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Constants
-
-/** Command to start a YARN container */
-trait Command {
-  def get: String
-  override def toString: String = get
-}
-
-abstract class AbstractCommand extends Command {
-  protected def config: Config
-  def version: String
-  def classPath: Array[String] = {
-    Array(
-      s"conf",
-      s"pack/$version/conf",
-      s"pack/$version/lib/daemon/*",
-      s"pack/$version/lib/*"
-    )
-  }
-
-  protected def buildCommand(
-      java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String])
-    : String = {
-    val exe = config.getString(java)
-
-    s"$exe -cp ${classPath.mkString(":")}:" +
-      "$CLASSPATH " + properties.mkString(" ") +
-      s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
-  }
-
-  protected def clazz(any: AnyRef): String = {
-    val name = any.getClass.getName
-    if (name.endsWith("$")) {
-      name.dropRight(1)
-    } else {
-      name
-    }
-  }
-}
-
-case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
-  extends AbstractCommand {
-
-  def get: String = {
-    val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}")
-
-    val properties = Array(
-      s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
-      s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}",
-      s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
-      s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
-      s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
-      s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}")
-
-    buildCommand(MASTER_COMMAND, properties, clazz(Master), masterArguments)
-  }
-}
-
-case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String)
-  extends AbstractCommand {
-
-  def get: String = {
-    val properties = Array(
-      s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
-      s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
-      s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
-      s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
-      s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
-      s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost")
-
-    buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String])
-  }
-}
-
-case class AppMasterCommand(config: Config, version: String, args: Array[String])
-  extends AbstractCommand {
-
-  override val classPath = Array(
-    "conf",
-    s"pack/$version/conf",
-    s"pack/$version/dashboard",
-    s"pack/$version/lib/*",
-    s"pack/$version/lib/daemon/*",
-    s"pack/$version/lib/services/*",
-    s"pack/$version/lib/yarn/*"
-  )
-
-  def get: String = {
-    val properties = Array(
-      s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
-      s"-D${Constants.GEARPUMP_FULL_SCALA_VERSION}=$version",
-      s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
-      s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
-      s"-D${Constants.GEARPUMP_HOSTNAME}=${NODEMANAGER_HOST}")
-
-    val arguments = Array(s"") ++ args
-
-    buildCommand(APPMASTER_COMMAND, properties, clazz(YarnAppMaster),
-      arguments)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
deleted file mode 100644
index 6dd5011..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import scala.concurrent.Future
-
-import akka.actor._
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.services.main.Services
-import io.gearpump.transport.HostPort
-import io.gearpump.util.{ActorUtil, Constants, LogUtil}
-
-trait UIFactory {
-  def props(masters: List[HostPort], host: String, port: Int): Props
-}
-
-/** Wrapper of UI server */
-class UIService(masters: List[HostPort], host: String, port: Int) extends Actor {
-  private val LOG = LogUtil.getLogger(getClass)
-
-  private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path)
-  private var configFile: java.io.File = null
-
-  private implicit val dispatcher = context.dispatcher
-
-  override def postStop(): Unit = {
-    if (configFile != null) {
-      configFile.delete()
-      configFile = null
-    }
-
-    // TODO: fix this
-    // Hack around to Kill the UI server
-    Services.kill()
-  }
-
-  override def preStart(): Unit = {
-    Future(start())
-  }
-
-  def start(): Unit = {
-    val mastersArg = masters.mkString(",")
-    LOG.info(s"Launching services -master $mastersArg")
-
-    configFile = java.io.File.createTempFile("uiserver", ".conf")
-
-    val config = context.system.settings.config.
-      withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)).
-      withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)).
-      withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host))
-
-    ClusterConfig.saveConfig(config, configFile)
-
-    val master = masters.head
-
-    ConfigFactory.invalidateCaches()
-    launch(supervisor, master.host, master.port, configFile.toString)
-  }
-
-  // Launch the UI server
-  def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
-    Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort"
-      , "-conf", configFile))
-  }
-
-  override def receive: Receive = {
-    case _ =>
-      LOG.error(s"Unknown message received")
-  }
-}
-
-object UIService extends UIFactory {
-  override def props(masters: List[HostPort], host: String, port: Int): Props = {
-    Props(new UIService(masters, host, port))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
deleted file mode 100644
index 1df7fb9..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import java.io.IOException
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.util.Timeout
-import com.typesafe.config.ConfigValueFactory
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.experiments.yarn.Constants._
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
-import io.gearpump.transport.HostPort
-import io.gearpump.util._
-
-/**
- * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as
- * YARN containers.
- *
- * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker.
- */
-class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
-    packagePath: String, hdfsConfDir: String,
-    uiFactory: UIFactory)
-  extends Actor {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private val akkaConf = context.system.settings.config
-  private val servicesEnabled = akkaConf.getString(SERVICES_ENABLED).toBoolean
-  private var uiStarted = false
-  private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
-
-  private val port = Util.findFreePort().get
-
-  private val trackingURL = "http://" + host + ":" + port
-
-  // TODO: for now, only one master is supported.
-  private val masterCount = 1
-  private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt
-  private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt
-
-  private var workerCount = akkaConf.getString(WORKER_CONTAINERS).toInt
-  private val workerMemory = akkaConf.getString(WORKER_MEMORY).toInt
-  private val workerVCores = akkaConf.getString(WORKER_VCORES).toInt
-
-  val rootPath = System.getProperty(Constants.GEARPUMP_FULL_SCALA_VERSION)
-
-  rmClient.start(self)
-  nmClient.start(self)
-
-  def receive: Receive = null
-
-  private def registerAppMaster(): Unit = {
-    val target = host + ":" + port
-    rmClient.registerAppMaster(host, port, trackingURL)
-  }
-
-  registerAppMaster
-  context.become(waitForAppMasterRegistered)
-
-  import io.gearpump.experiments.yarn.appmaster.YarnAppMaster._
-
-  def waitForAppMasterRegistered: Receive = {
-    case AppMasterRegistered =>
-      LOG.info("YarnAppMaster registration completed")
-      requestMasterContainers(masterCount)
-      context.become(startingMasters(remain = masterCount, List.empty[MasterInfo]))
-  }
-
-  private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box {
-    case ContainersAllocated(containers) =>
-      LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: "
-        + containers.size)
-      val count = Math.min(containers.length, remain)
-      val newMasters = (0 until count).toList.map { index =>
-        val container = containers(index)
-        MasterInfo(container.getId, container.getNodeId, launchMaster(container))
-      }
-
-      // Stops un-used containers
-      containers.drop(count).map { container =>
-        nmClient.stopContainer(container.getId, container.getNodeId)
-      }
-
-      context.become(startingMasters(remain, newMasters ++ masters))
-    case ContainerStarted(containerId) =>
-      LOG.info(s"ContainerStarted: container ${containerId} started for master(remain=$remain) ")
-      if (remain > 1) {
-        context.become(startingMasters(remain - 1, masters))
-      } else {
-        requestWorkerContainers(workerCount)
-        context.become(startingWorkers(workerCount, masters, List.empty[WorkerInfo]))
-      }
-  }
-
-  private def box(receive: Receive): Receive = {
-    onError orElse receive orElse unHandled
-  }
-
-  private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo])
-  : Receive = {
-    box {
-      case ContainersAllocated(containers) =>
-        LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " +
-          s"count: " + containers.size)
-
-        val count = Math.min(containers.length, remain)
-        val newWorkers = (0 until count).toList.map { index =>
-          val container = containers(index)
-          launchWorker(container, masters)
-          WorkerInfo(container.getId, container.getNodeId)
-        }
-
-        // Stops un-used containers
-        containers.drop(count).map { container =>
-          nmClient.stopContainer(container.getId, container.getNodeId)
-        }
-        context.become(startingWorkers(remain, masters, workers ++ newWorkers))
-      case ContainerStarted(containerId) =>
-        LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)")
-        // The last one
-        if (remain > 1) {
-          context.become(startingWorkers(remain - 1, masters, workers))
-        } else {
-          if (servicesEnabled && !uiStarted) {
-            context.actorOf(uiFactory.props(masters.map(_.host), host, port))
-            uiStarted = true
-          }
-          context.become(service(effectiveConfig(masters.map(_.host)), masters, workers))
-        }
-    }
-  }
-
-  private def effectiveConfig(masters: List[HostPort]): Config = {
-    val masterList = masters.map(pair => s"${pair.host}:${pair.port}")
-    val config = context.system.settings.config
-    config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
-      ConfigValueFactory.fromIterable(masterList.asJava))
-  }
-
-  private def onError: Receive = {
-    case ContainersCompleted(containers) =>
-      // TODO: we should recover the failed container from this...
-      containers.foreach { status =>
-        if (status.getExitStatus != 0) {
-          LOG.error(s"ContainersCompleted: container ${status.getContainerId}" +
-            s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
-        } else {
-          LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed")
-        }
-      }
-    case ShutdownApplication =>
-      LOG.error("ShutdownApplication")
-      nmClient.stop()
-      rmClient.shutdownApplication()
-      context.stop(self)
-    case ResourceManagerException(ex) =>
-      LOG.error("ResourceManagerException: " + ex.getMessage, ex)
-      nmClient.stop()
-      rmClient.failApplication(ex)
-      context.stop(self)
-    case Kill =>
-      LOG.info("Kill: User asked to shutdown the application")
-      sender ! CommandResult(success = true)
-      self ! ShutdownApplication
-  }
-
-  private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo])
-  : Receive = box {
-    case GetActiveConfig(clientHost) =>
-      LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost)
-      val filtered = ClusterConfig.filterOutDefaultConfig(
-        config.withValue(Constants.GEARPUMP_HOSTNAME,
-          ConfigValueFactory.fromAnyRef(clientHost)))
-      sender ! ActiveConfig(filtered)
-    case QueryVersion =>
-      LOG.info("QueryVersion")
-      sender ! Version(Util.version)
-    case QueryClusterInfo =>
-      LOG.info("QueryClusterInfo")
-      val masterContainers = masters.map { master =>
-        master.id.toString + s"(${master.nodeId.toString})"
-      }
-
-      val workerContainers = workers.map { worker =>
-        worker.id.toString + s"(${worker.nodeId.toString})"
-      }
-      sender ! ClusterInfo(masterContainers, workerContainers)
-    case AddMaster =>
-      sender ! CommandResult(success = false, "Not Implemented")
-    case RemoveMaster(masterId) =>
-      sender ! CommandResult(success = false, "Not Implemented")
-    case AddWorker(count) =>
-      if (count == 0) {
-        sender ! CommandResult(success = true)
-      } else {
-        LOG.info("AddWorker: Start to add new workers, count: " + count)
-        workerCount += count
-        requestWorkerContainers(count)
-        context.become(startingWorkers(count, masters, workers))
-        sender ! CommandResult(success = true)
-      }
-    case RemoveWorker(worker) =>
-      val workerId = ContainerId.fromString(worker)
-      LOG.info(s"RemoveWorker: remove worker $workerId")
-      val info = workers.find(_.id.toString == workerId.toString)
-      if (info.isDefined) {
-        nmClient.stopContainer(info.get.id, info.get.nodeId)
-        sender ! CommandResult(success = true)
-        val remainWorkers = workers.filter(_.id != info.get.id)
-        context.become(service(config, masters, remainWorkers))
-      } else {
-        sender ! CommandResult(success = false, "failed to find worker " + worker)
-      }
-  }
-
-  private def unHandled: Receive = {
-    case other =>
-      LOG.info(s"Received unknown message $other")
-  }
-
-  private def requestMasterContainers(masters: Int) = {
-    LOG.info(s"Request resource for masters($masters)")
-    val containers = (1 to masters).map(
-      i => Resource.newInstance(masterMemory, masterVCores)
-    ).toList
-    rmClient.requestContainers(containers)
-  }
-
-  private def launchMaster(container: Container): HostPort = {
-    LOG.info(s"Launch Master on container " + container.getNodeHttpAddress)
-    val host = container.getNodeId.getHost
-
-    val port = Util.findFreePort().get
-
-    LOG.info("=============PORT" + port)
-    val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port))
-    nmClient.launchCommand(container, masterCommand.get, packagePath, hdfsConfDir)
-    HostPort(host, port)
-  }
-
-  private def requestWorkerContainers(workers: Int): Unit = {
-    LOG.info(s"Request resource for workers($workers)")
-    val containers = (1 to workers).map(
-      i => Resource.newInstance(workerMemory, workerVCores)
-    ).toList
-
-    rmClient.requestContainers(containers)
-  }
-
-  private def launchWorker(container: Container, masters: List[MasterInfo]): Unit = {
-    LOG.info(s"Launch Worker on container " + container.getNodeHttpAddress)
-    val workerHost = container.getNodeId.getHost
-    val workerCommand = WorkerCommand(akkaConf, rootPath, masters.head.host, workerHost)
-    nmClient.launchCommand(container, workerCommand.get, packagePath, hdfsConfDir)
-  }
-}
-
-object YarnAppMaster extends AkkaApp with ArgumentsParser {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "conf" -> CLIOption[String]("<Gearpump configuration directory on HDFS>", required = true),
-    "package" -> CLIOption[String]("<Gearpump package path on HDFS>", required = true)
-  )
-
-  override def akkaConfig: Config = {
-    ClusterConfig.ui()
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-    implicit val system = ActorSystem("GearpumpAM", akkaConf)
-
-    val yarnConf = new YarnConfig()
-
-    val confDir = parse(args).getString("conf")
-    val packagePath = parse(args).getString("package")
-
-    LOG.info("HADOOP_CONF_DIR: " + System.getenv("HADOOP_CONF_DIR"))
-    LOG.info("YARN Resource Manager: " + yarnConf.resourceManager)
-
-    val rmClient = new RMClient(yarnConf)
-    val nmClient = new NMClient(yarnConf, akkaConf)
-    val appMaster = system.actorOf(Props(new YarnAppMaster(rmClient,
-      nmClient, packagePath, confDir, UIService)))
-
-    val daemon = system.actorOf(Props(new Daemon(appMaster)))
-    Await.result(system.whenTerminated, Duration.Inf)
-    LOG.info("YarnAppMaster is shutdown")
-  }
-
-  class Daemon(appMaster: ActorRef) extends Actor {
-    context.watch(appMaster)
-
-    override def receive: Actor.Receive = {
-      case Terminated(actor) =>
-        if (actor.compareTo(appMaster) == 0) {
-          LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " +
-            s"shutting down current ActorSystem")
-          context.system.terminate()
-          context.stop(self)
-        }
-    }
-  }
-
-  case class ResourceManagerException(throwable: Throwable)
-  case object ShutdownApplication
-  case class ContainersRequest(containers: List[Resource])
-  case class ContainersAllocated(containers: List[Container])
-  case class ContainersCompleted(containers: List[ContainerStatus])
-  case class ContainerStarted(containerId: ContainerId)
-  case object AppMasterRegistered
-
-  case class GetActiveConfig(clientHost: String)
-
-  case object QueryClusterInfo
-  case class ClusterInfo(masters: List[String], workers: List[String]) {
-    override def toString: String = {
-      val separator = "\n"
-      val masterSection = "masters: " + separator + masters.mkString("\n") + "\n"
-
-      val workerSection = "workers: " + separator + workers.mkString("\n") + "\n"
-      masterSection + workerSection
-    }
-  }
-
-  case object Kill
-  case class ActiveConfig(config: Config)
-
-  case object QueryVersion
-
-  case class Version(version: String)
-
-  case class MasterInfo(id: ContainerId, nodeId: NodeId, host: HostPort)
-
-  case class WorkerInfo(id: ContainerId, nodeId: NodeId)
-
-  def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
-    val client = new HttpClient()
-    val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
-    val get = new GetMethod(appMasterPath)
-    var status = client.executeMethod(get)
-
-    if (status != 200) {
-      // Sleeps a little bit, and try again
-      Thread.sleep(3000)
-      status = client.executeMethod(get)
-    }
-
-    if (status == 200) {
-      AkkaHelper.actorFor(system, get.getResponseBodyAsString)
-    } else {
-      throw new IOException("Fail to resolve AppMaster address, please make sure " +
-        s"${report.getOriginalTrackingUrl} is accessible...")
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
deleted file mode 100644
index 49ec3a0..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import java.io.IOException
-import scala.util.Try
-
-import akka.actor.{ActorRef, ActorSystem}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.experiments.yarn.glue.YarnClient
-import io.gearpump.util.{AkkaHelper, LogUtil}
-
-/**
- * Resolves AppMaster ActorRef
- */
-class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
-  val LOG = LogUtil.getLogger(getClass)
-  val RETRY_INTERVAL_MS = 3000 // ms
-
-  def resolve(appId: ApplicationId, timeoutSeconds: Int = 30): ActorRef = {
-    val appMaster = retry(connect(appId), 1 + timeoutSeconds * 1000 / RETRY_INTERVAL_MS)
-    appMaster
-  }
-
-  private def connect(appId: ApplicationId): ActorRef = {
-    val report = yarnClient.getApplicationReport(appId)
-    val client = new HttpClient()
-    val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
-    LOG.info(s"appMasterPath=$appMasterPath")
-    val get = new GetMethod(appMasterPath)
-    val status = client.executeMethod(get)
-    if (status == 200) {
-      val response = get.getResponseBodyAsString
-      LOG.info("Successfully resolved AppMaster address: " + response)
-      AkkaHelper.actorFor(system, response)
-    } else {
-      throw new IOException("Fail to resolve AppMaster address, please make sure " +
-        s"${report.getOriginalTrackingUrl} is accessible...")
-    }
-  }
-
-  private def retry(fun: => ActorRef, times: Int): ActorRef = {
-    var index = 0
-    var result: ActorRef = null
-    while (index < times && result == null) {
-      Thread.sleep(RETRY_INTERVAL_MS)
-      index += 1
-      val tryConnect = Try(fun)
-      if (tryConnect.isFailure) {
-        LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " +
-          tryConnect.failed.get.getMessage)
-      } else {
-        result = tryConnect.get
-      }
-    }
-    result
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
deleted file mode 100644
index 7c3bc38..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.yarn.client
-import org.slf4j.Logger
-
-import io.gearpump.util.LogUtil
-
-/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */
-object Client {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  val LAUNCH = "launch"
-
-  val commands = Map(LAUNCH -> LaunchCluster) ++
-    ManageCluster.commands.map(key => (key, ManageCluster)).toMap
-
-  def usage(): Unit = {
-    val keys = commands.keys.toList.sorted
-    // scalastyle:off println
-    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
-    // scalastyle:on println
-  }
-
-  def main(args: Array[String]): Unit = {
-    if (args.length == 0) {
-      usage()
-    } else {
-      val key = args(0)
-      val command = commands.get(key)
-      command match {
-        case Some(command) =>
-          if (key == LAUNCH) {
-            val remainArgs = args.drop(1)
-            command.main(remainArgs)
-          } else {
-            val commandArg = Array("-" + ManageCluster.COMMAND, key)
-            val remainArgs = args.drop(1)
-            val updatedArgs = commandArg ++ args.drop(1)
-            command.main(updatedArgs)
-          }
-        case None =>
-          usage
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
deleted file mode 100644
index ea5e707..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.yarn.client
-
-import java.io.{File, IOException, OutputStreamWriter}
-import java.net.InetAddress
-import java.util.zip.ZipInputStream
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorSystem
-import com.typesafe.config.{Config, ConfigValueFactory}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.experiments.yarn
-import io.gearpump.experiments.yarn.Constants
-import io.gearpump.experiments.yarn.appmaster.AppMasterCommand
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
-import io.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource}
-import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
-import io.gearpump.util.ActorUtil.askActor
-import io.gearpump.util.{AkkaApp, LogUtil, Util}
-
-/**
- * Launch Gearpump on YARN
- */
-class LaunchCluster(
-    akka: Config,
-    yarnConf: YarnConfig,
-    yarnClient: YarnClient,
-    fs: FileSystem,
-    actorSystem: ActorSystem,
-    appMasterResolver: AppMasterResolver,
-    version: String = Util.version) {
-
-  import io.gearpump.experiments.yarn.Constants._
-  private implicit val dispatcher = actorSystem.dispatcher
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private val host = InetAddress.getLocalHost.getHostName
-  private val queue = akka.getString(APPMASTER_QUEUE)
-  private val memory = akka.getString(APPMASTER_MEMORY).toInt
-  private val vcore = akka.getString(APPMASTER_VCORES).toInt
-
-  def submit(appName: String, packagePath: String): ApplicationId = {
-    LOG.info("Starting AM")
-
-    // First step, check the version, to make sure local version matches remote version
-    if (!packagePath.endsWith(".zip")) {
-      throw new IOException(s"YarnClient only support .zip distribution package," +
-        s" now it is ${packagePath}. Please download the zip " +
-        "package from website or use sbt assembly packArchiveZip to build one.")
-    }
-
-    if (!fs.exists(packagePath)) {
-      throw new IOException(s"Cannot find package ${packagePath} on HDFS ${fs.name}. ")
-    }
-
-    val rootEntry = rootEntryPath(zip = packagePath)
-
-    if (!rootEntry.contains(version)) {
-      throw new IOException(s"Check version failed! Local gearpump binary" +
-        s" version $version doesn't match with remote path $packagePath")
-    }
-
-    val resource = Resource.newInstance(memory, vcore)
-    val appId = yarnClient.createApplication
-
-    // uploads the configs to HDFS home directory of current user.
-    val configPath = uploadConfigToHDFS(appId)
-
-    val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath",
-      s"-package $packagePath"))
-
-    yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath)
-
-    LOG.info("Waiting application to finish...")
-    val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS)
-    LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " +
-      s"at ${report.getFinishTime}, info: ${report.getDiagnostics}")
-
-    // scalastyle:off println
-    Console.println("================================================")
-    Console.println("==Application Id: " + appId)
-    // scalastyle:on println
-    appId
-  }
-
-  def saveConfig(appId: ApplicationId, output: String): Future[File] = {
-    LOG.info(s"Trying to download active configuration to output path: " + output)
-    LOG.info(s"Resolving YarnAppMaster ActorRef for application " + appId)
-    val appMaster = appMasterResolver.resolve(appId)
-    LOG.info(s"appMaster=${appMaster.path} host=$host")
-    val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config)
-    future.map { config =>
-      val out = new File(output)
-      ClusterConfig.saveConfig(config, out)
-      out
-    }
-  }
-
-  private def uploadConfigToHDFS(appId: ApplicationId): String = {
-    // Uses personal home directory so that it will not conflict with other users
-    // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf
-    val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/"
-    LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...")
-
-    // Copies config from local to remote.
-    val remoteConfFile = s"$confDir/gear.conf"
-    var out = fs.create(remoteConfFile)
-    var writer = new OutputStreamWriter(out)
-
-    val cleanedConfig = ClusterConfig.filterOutDefaultConfig(akka)
-
-    writer.write(cleanedConfig.root().render())
-    writer.close()
-
-    // Saves yarn-site.xml to remote
-    val yarn_site_xml = s"$confDir/yarn-site.xml"
-    out = fs.create(yarn_site_xml)
-    writer = new OutputStreamWriter(out)
-    yarnConf.writeXml(writer)
-    writer.close()
-
-    // Saves log4j.properties to remote
-    val log4j_properties = s"$confDir/log4j.properties"
-    val log4j = LogUtil.loadConfiguration
-    out = fs.create(log4j_properties)
-    writer = new OutputStreamWriter(out)
-    log4j.store(writer, "gearpump on yarn")
-    writer.close()
-    confDir.toString
-  }
-
-  private def rootEntryPath(zip: String): String = {
-    val stream = new ZipInputStream(fs.open(zip))
-    val entry = stream.getNextEntry()
-    val name = entry.getName
-    name.substring(0, entry.getName.indexOf("/"))
-  }
-}
-
-object LaunchCluster extends AkkaApp with ArgumentsParser {
-
-  val PACKAGE = "package"
-  val NAME = "name"
-  val VERBOSE = "verbose"
-  val OUTPUT = "output"
-
-  override protected def akkaConfig: Config = {
-    ClusterConfig.default()
-  }
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " +
-      "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
-    NAME -> CLIOption[String]("<Application name showed in YARN>", required = false,
-      defaultValue = Some("Gearpump")),
-    VERBOSE -> CLIOption("<print verbose log on console>", required = false,
-      defaultValue = Some(false)),
-    OUTPUT -> CLIOption("<output path for configuration file>", required = false,
-      defaultValue = None)
-  )
-  private val TIMEOUT_MILLISECONDS = 30 * 1000
-
-  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-    val parsed = parse(args)
-    if (parsed.getBoolean(VERBOSE)) {
-      LogUtil.verboseLogToConsole()
-    }
-
-    val yarnConfig = new YarnConfig()
-    val fs = new FileSystem(yarnConfig)
-    val yarnClient = new YarnClient(yarnConfig)
-    val akkaConf = updateConf(inputAkkaConf, parsed)
-    val actorSystem = ActorSystem("launchCluster", akkaConf)
-    val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem)
-
-    val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs,
-      actorSystem, appMasterResolver)
-
-    val name = parsed.getString(NAME)
-    val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH))
-
-    if (parsed.exists(OUTPUT)) {
-      import scala.concurrent.duration._
-      Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)),
-        TIMEOUT_MILLISECONDS.milliseconds)
-    }
-
-    yarnClient.stop()
-    actorSystem.terminate()
-    Await.result(actorSystem.whenTerminated, Duration.Inf)
-  }
-
-  private def updateConf(akka: Config, parsed: ParseResult): Config = {
-    if (parsed.exists(PACKAGE)) {
-      akka.withValue(Constants.PACKAGE_PATH,
-        ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
-    } else {
-      akka
-    }
-  }
-}
\ No newline at end of file