You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:33 UTC
[23/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
new file mode 100644
index 0000000..49afe05
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.producer
+
+import java.util.{List => JList}
+
+import backtype.storm.spout.ISpout
+import backtype.storm.utils.Utils
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import scala.collection.JavaConverters._
+
+class StormSpoutOutputCollectorSpec
+ extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("StormSpoutOutputCollector should call StormOutputCollector") {
+ val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000))
+ val valuesGen = Gen.listOf[AnyRef](valGen)
+
+ forAll(valuesGen) { (values: List[AnyRef]) =>
+ val collector = mock[StormOutputCollector]
+ val spout = mock[ISpout]
+ val streamId = Utils.DEFAULT_STREAM_ID
+ val spoutCollector = new StormSpoutOutputCollector(collector, spout, false)
+ spoutCollector.emit(streamId, values.asJava, null)
+ verify(collector).emit(streamId, values.asJava)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
new file mode 100644
index 0000000..bdea50c
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{Map => JMap}
+
+import akka.actor.ActorRef
+import backtype.storm.spout.{ISpout, SpoutOutputCollector}
+import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
+import backtype.storm.tuple.Tuple
+import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import org.apache.gearpump.streaming.{DAG, MockUtil}
+import org.apache.gearpump.{Message, TimeStamp}
+import org.mockito.Matchers.{anyObject, eq => mockitoEq}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class GearpumpStormComponentSpec
+ extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("GearpumpSpout lifecycle") {
+ val config = mock[JMap[AnyRef, AnyRef]]
+ val spout = mock[ISpout]
+ val taskContext = MockUtil.mockTaskContext
+ val appMaster = mock[ActorRef]
+ when(taskContext.appMaster).thenReturn(appMaster)
+ val getDAG = mock[ActorRef => DAG]
+ val dag = mock[DAG]
+ when(getDAG(appMaster)).thenReturn(dag)
+ val getTopologyContext = mock[(DAG, TaskId) => TopologyContext]
+ val topologyContext = mock[TopologyContext]
+ when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext)
+ val getOutputCollector = mock[(TaskContext, TopologyContext) => StormSpoutOutputCollector]
+ val outputCollector = mock[StormSpoutOutputCollector]
+ when(getOutputCollector(taskContext, topologyContext)).thenReturn(outputCollector)
+
+ val gearpumpSpout = GearpumpSpout(config, spout, getDAG, getTopologyContext,
+ getOutputCollector, ackEnabled = false, taskContext)
+
+ // Start
+ val startTime = mock[StartTime]
+ gearpumpSpout.start(startTime)
+
+ verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
+ anyObject[SpoutOutputCollector])
+
+ // Next
+ val message = mock[Message]
+ gearpumpSpout.next(message)
+
+ verify(spout).nextTuple()
+ }
+
+ property("GearpumpBolt lifecycle") {
+ val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+ val freqGen = Gen.chooseNum[Int](1, 100)
+ forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) =>
+ val config = mock[JMap[AnyRef, AnyRef]]
+ val bolt = mock[IBolt]
+ val taskContext = MockUtil.mockTaskContext
+ val appMaster = mock[ActorRef]
+ when(taskContext.appMaster).thenReturn(appMaster)
+ val getDAG = mock[ActorRef => DAG]
+ val dag = mock[DAG]
+ when(getDAG(appMaster)).thenReturn(dag)
+ val getTopologyContext = mock[(DAG, TaskId) => TopologyContext]
+ val topologyContext = mock[TopologyContext]
+ when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext)
+ val getGeneralTopologyContext = mock[DAG => GeneralTopologyContext]
+ val generalTopologyContext = mock[GeneralTopologyContext]
+ when(getGeneralTopologyContext(dag)).thenReturn(generalTopologyContext)
+ val getOutputCollector = mock[(TaskContext, TopologyContext) => StormOutputCollector]
+ val stormOutputCollector = mock[StormOutputCollector]
+ when(getOutputCollector(taskContext, topologyContext)).thenReturn(stormOutputCollector)
+ val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple]
+ val tickTuple = mock[Tuple]
+ when(getTickTuple(mockitoEq(generalTopologyContext), anyObject[Int]())).thenReturn(tickTuple)
+ val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext,
+ getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
+
+ // Start
+ val startTime = mock[StartTime]
+ gearpumpBolt.start(startTime)
+
+ verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
+ anyObject[OutputCollector])
+
+ // Next
+ val gearpumpTuple = mock[GearpumpTuple]
+ val tuple = mock[Tuple]
+ when(gearpumpTuple.toTuple(generalTopologyContext, timestamp)).thenReturn(tuple)
+ val message = Message(gearpumpTuple, timestamp)
+ gearpumpBolt.next(message)
+
+ verify(stormOutputCollector).setTimestamp(timestamp)
+ verify(bolt).execute(tuple)
+
+ // Tick
+ gearpumpBolt.tick(freq)
+ verify(bolt).execute(tickTuple)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
new file mode 100644
index 0000000..ef383ad
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.Config
+import org.apache.gearpump.experiments.storm.processor.StormProcessor
+import org.apache.gearpump.experiments.storm.producer.StormProducer
+import org.apache.gearpump.experiments.storm.util.TopologyUtil
+import org.apache.gearpump.streaming.MockUtil
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
+import scala.collection.JavaConverters._
+
+class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar {
+ import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._
+
+ "GearpumpStormTopology" should {
+ "merge configs with defined priority" in {
+ val stormTopology = TopologyUtil.getTestTopology
+ val name = "name"
+ val sysVal = "sys"
+ val sysConfig = newJavaConfig(name, sysVal)
+ val appVal = "app"
+ val appConfig = newJavaConfig(name, appVal)
+
+ implicit val system = MockUtil.system
+ val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig,
+ newEmptyConfig)
+ topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1"
+ topology1.getStormConfig should not contain name
+
+ val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig,
+ newEmptyConfig)
+ topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2"
+ topology2.getStormConfig.get(name) shouldBe sysVal
+
+ val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig)
+ topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3"
+ topology3.getStormConfig.get(name) shouldBe appVal
+ }
+
+ "create Gearpump processors from Storm topology" in {
+ val stormTopology = TopologyUtil.getTestTopology
+ implicit val system = MockUtil.system
+ val gearpumpStormTopology =
+ GearpumpStormTopology("app", stormTopology, null)
+ val processors = gearpumpStormTopology.getProcessors
+ stormTopology.get_spouts().asScala.foreach { case (spoutId, _) =>
+ val processor = processors(spoutId)
+ processor.taskClass shouldBe classOf[StormProducer]
+ processor.description shouldBe spoutId
+ }
+ stormTopology.get_bolts().asScala.foreach { case (boltId, _) =>
+ val processor = processors(boltId)
+ processor.taskClass shouldBe classOf[StormProcessor]
+ processor.description shouldBe boltId
+ }
+ }
+
+ "get target processors from source id" in {
+ val stormTopology = TopologyUtil.getTestTopology
+ implicit val system = MockUtil.system
+ val sysConfig = new JHashMap[AnyRef, AnyRef]
+ val gearpumpStormTopology =
+ GearpumpStormTopology("app", stormTopology, null)
+ val targets0 = gearpumpStormTopology.getTargets("1")
+ targets0 should contain key "3"
+ targets0 should contain key "4"
+ val targets1 = gearpumpStormTopology.getTargets("2")
+ targets1 should contain key "3"
+ }
+ }
+}
+
+object GearpumpStormTopologySpec {
+ def newEmptyConfig: JMap[AnyRef, AnyRef] = {
+ new JHashMap[AnyRef, AnyRef]
+ }
+
+ def newJavaConfig(key: AnyRef, value: AnyRef): JMap[AnyRef, AnyRef] = {
+ val config = new JHashMap[AnyRef, AnyRef]
+ config.put(key, value)
+ config
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
new file mode 100644
index 0000000..f12e54f
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.storm.topology
+
+import java.util.{List => JList}
+
+import backtype.storm.task.GeneralTopologyContext
+import backtype.storm.tuple.Fields
+import org.apache.gearpump.TimeStamp
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import scala.collection.JavaConverters._
+
+class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("GearpumpTuple should create Storm Tuple") {
+ val tupleGen = for {
+ values <- Gen.listOf[String](Gen.alphaStr).map(_.distinct.asJava.asInstanceOf[JList[AnyRef]])
+ sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue)
+ sourceStreamId <- Gen.alphaStr
+ } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null)
+
+ forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
+ (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) =>
+ val topologyContext = mock[GeneralTopologyContext]
+ val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
+ when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)
+ when(topologyContext.getComponentOutputFields(
+ componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields)
+
+ val tuple = gearpumpTuple.toTuple(topologyContext, timestamp)
+
+ tuple shouldBe a[TimedTuple]
+ val timedTuple = tuple.asInstanceOf[TimedTuple]
+ timedTuple.getValues shouldBe gearpumpTuple.values
+ timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId
+ timedTuple.getSourceComponent shouldBe componentId
+ timedTuple.getSourceStreamId shouldBe gearpumpTuple.sourceStreamId
+ timedTuple.getMessageId shouldBe null
+ timedTuple.getFields shouldBe fields
+ timedTuple.timestamp shouldBe timestamp
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
new file mode 100644
index 0000000..9cf5009
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+
+class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
+
+ "GraphBuilder" should {
+ "build Graph from Storm topology" in {
+ val topology = mock[GearpumpStormTopology]
+
+ val sourceId = "source"
+ val sourceProcessor = mock[Processor[Task]]
+ val targetId = "target"
+ val targetProcessor = mock[Processor[Task]]
+
+ when(topology.getProcessors).thenReturn(
+ Map(sourceId -> sourceProcessor, targetId -> targetProcessor))
+ when(topology.getTargets(sourceId)).thenReturn(Map(targetId -> targetProcessor))
+ when(topology.getTargets(targetId)).thenReturn(Map.empty[String, Processor[Task]])
+
+ val graph = GraphBuilder.build(topology)
+
+ graph.edges.size shouldBe 1
+ val (from, edge, to) = graph.edges.head
+ from shouldBe sourceProcessor
+ edge shouldBe a[StormPartitioner]
+ to shouldBe targetProcessor
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala
new file mode 100644
index 0000000..c1cdb3b
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters._
+
+import backtype.storm.generated.GlobalStreamId
+import backtype.storm.grouping.CustomStreamGrouping
+import backtype.storm.task.TopologyContext
+import backtype.storm.tuple.Fields
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.experiments.storm.util.GrouperSpec.Value
+
+class GrouperSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ val taskIdGen = Gen.chooseNum[Int](0, 1000)
+ val valuesGen = Gen.listOf[String](Gen.alphaStr)
+ .map(_.asJava.asInstanceOf[JList[AnyRef]])
+ val numTasksGen = Gen.chooseNum[Int](1, 1000)
+
+ property("GlobalGrouper should always return partition 0") {
+ forAll(taskIdGen, valuesGen) { (taskId: Int, values: JList[AnyRef]) =>
+ val grouper = new GlobalGrouper
+ grouper.getPartitions(taskId, values) shouldBe List(0)
+ }
+ }
+
+ property("NoneGrouper should returns partition in the range [0, numTasks)") {
+ forAll(taskIdGen, valuesGen, numTasksGen) {
+ (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
+ val grouper = new NoneGrouper(numTasks)
+ val partitions = grouper.getPartitions(taskId, values)
+ partitions.size shouldBe 1
+ partitions.head should (be >= 0 and be < numTasks)
+ }
+ }
+
+ property("ShuffleGrouper should return partition in the range [0, numTasks)") {
+ forAll(taskIdGen, valuesGen, numTasksGen) {
+ (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
+ val grouper = new ShuffleGrouper(numTasks)
+ val partitions = grouper.getPartitions(taskId, values)
+ partitions.size shouldBe 1
+ partitions.head should (be >= 0 and be < numTasks)
+ }
+ }
+
+ property("FieldsGrouper should return partition according to fields") {
+ forAll(taskIdGen, numTasksGen) {
+ (taskId: Int, numTasks: Int) =>
+ val values = 0.until(numTasks).map(i => new Value(i))
+ val fields = values.map(_.toString)
+ val outFields = new Fields(fields: _*)
+ values.flatMap { v =>
+ val groupFields = new Fields(v.toString)
+ val grouper = new FieldsGrouper(outFields, groupFields, numTasks)
+ grouper.getPartitions(taskId,
+ values.toList.asJava.asInstanceOf[JList[AnyRef]])
+ }.distinct.size shouldBe numTasks
+ }
+ }
+
+ property("AllGrouper should return all partitions") {
+ forAll(taskIdGen, numTasksGen, valuesGen) {
+ (taskId: Int, numTasks: Int, values: JList[AnyRef]) =>
+ val grouper = new AllGrouper(numTasks)
+ val partitions = grouper.getPartitions(taskId, values)
+ partitions.distinct.size shouldBe numTasks
+ partitions.min shouldBe 0
+ partitions.max shouldBe (numTasks - 1)
+ }
+ }
+
+ property("CustomGrouper should return partitions specified by user") {
+ val grouping = mock[CustomStreamGrouping]
+ val grouper = new CustomGrouper(grouping)
+ val topologyContext = mock[TopologyContext]
+ val globalStreamId = mock[GlobalStreamId]
+ val sourceTasks = mock[JList[Integer]]
+
+ grouper.prepare(topologyContext, globalStreamId, sourceTasks)
+
+ verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks)
+
+ forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: JList[AnyRef], taskNum: Int) =>
+ 0.until(taskNum).foreach { i =>
+ when(grouping.chooseTasks(taskId, values)).thenReturn(List(new Integer(i)).asJava)
+ grouper.getPartitions(taskId, values) shouldBe List(i)
+ }
+ }
+ }
+}
+
+object GrouperSpec {
+ class Value(val i: Int) extends AnyRef {
+
+ override def toString: String = s"$i"
+
+ override def hashCode(): Int = i
+
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[Value]) {
+ this.i == other.asInstanceOf[Value].i
+ } else {
+ false
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
new file mode 100644
index 0000000..e0e9e61
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+
+import backtype.storm.generated.Grouping
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump._
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.streaming.MockUtil
+
+class StormOutputCollectorSpec
+ extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ val stormTaskId = 0
+ val streamIdGen = Gen.alphaStr
+ val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
+ val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+
+ property("StormOutputCollector emits tuple values into a stream") {
+ forAll(timestampGen, streamIdGen, valuesGen) {
+ (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
+ val targets = mock[JMap[String, JMap[String, Grouping]]]
+ val taskToComponent = mock[JMap[Integer, String]]
+ val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+ (Map[String, Array[Int]], JList[Integer])]
+ val targetPartitions = mock[Map[String, Array[Int]]]
+ val targetStormTaskIds = mock[JList[Integer]]
+ when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+ targetStormTaskIds))
+ val taskContext = MockUtil.mockTaskContext
+ val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+ targets, getTargetPartitionsFn, taskContext, LatestTime)
+
+ when(targets.containsKey(streamId)).thenReturn(false)
+ stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
+ verify(taskContext, times(0)).output(anyObject[Message])
+
+ when(targets.containsKey(streamId)).thenReturn(true)
+ stormOutputCollector.setTimestamp(timestamp)
+ stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
+ verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
+ case Message(tuple: GearpumpTuple, t) =>
+ val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
+ tuple == expected && t == timestamp
+ }))
+ }
+ }
+
+ property("StormOutputCollector emit direct to a task") {
+ val idGen = Gen.chooseNum[Int](0, 1000)
+ val targetGen = Gen.alphaStr
+ forAll(idGen, targetGen, timestampGen, streamIdGen, valuesGen) {
+ (id: Int, target: String, timestamp: Long, streamId: String, values: JList[AnyRef]) =>
+ val targets = mock[JMap[String, JMap[String, Grouping]]]
+ val taskToComponent = mock[JMap[Integer, String]]
+ when(taskToComponent.get(id)).thenReturn(target)
+ val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+ (Map[String, Array[Int]], JList[Integer])]
+ val targetPartitions = mock[Map[String, Array[Int]]]
+ val targetStormTaskIds = mock[JList[Integer]]
+ when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+ targetStormTaskIds))
+ val taskContext = MockUtil.mockTaskContext
+ val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+ targets, getTargetPartitionsFn, taskContext, LatestTime)
+
+ when(targets.containsKey(streamId)).thenReturn(false)
+ verify(taskContext, times(0)).output(anyObject[Message])
+
+ when(targets.containsKey(streamId)).thenReturn(true)
+ stormOutputCollector.setTimestamp(timestamp)
+ stormOutputCollector.emitDirect(id, streamId, values)
+ val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
+ verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
+ case Message(tuple: GearpumpTuple, t) => {
+ val expected = new GearpumpTuple(values, stormTaskId, streamId,
+ Map(target -> partitions))
+
+ val result = tuple == expected && t == timestamp
+ result
+ }
+ }))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
new file mode 100644
index 0000000..e787c3d
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+
+import akka.actor.ExtendedActorSystem
+import backtype.storm.utils.Utils
+import com.esotericsoftware.kryo.Kryo
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.streaming.MockUtil
+
+class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("StormSerializerPool should create and manage StormSerializer") {
+ val taskContext = MockUtil.mockTaskContext
+ val serializerPool = new StormSerializationFramework
+ val system = taskContext.system.asInstanceOf[ExtendedActorSystem]
+ implicit val actorSystem = system
+ val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, AnyRef]]
+ val config = UserConfig.empty.withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, stormConfig)
+ serializerPool.init(system, config)
+ serializerPool.get shouldBe a[StormSerializer]
+ }
+
+ property("StormSerializer should serialize and deserialize GearpumpTuple") {
+ val tupleGen = for {
+ values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
+ sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue)
+ sourceStreamId <- Gen.alphaStr
+ } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null)
+
+ val kryo = new Kryo
+ forAll(tupleGen) { (tuple: GearpumpTuple) =>
+ val serializer = new StormSerializer(kryo)
+ serializer.deserialize(serializer.serialize(tuple)) shouldBe tuple
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala
new file mode 100644
index 0000000..36d84cb
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.collection.JavaConverters._
+
+import backtype.storm.Config
+import backtype.storm.generated.StormTopology
+import org.apache.storm.shade.org.json.simple.JSONValue
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.experiments.storm.util.StormUtil._
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.TaskId
+
+class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("convert Storm task ids to gearpump TaskIds and back") {
+ val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
+ forAll(idGen) { (stormTaskId: Int) =>
+ gearpumpTaskIdToStorm(stormTaskIdToGearpump(stormTaskId)) shouldBe stormTaskId
+ }
+
+ val processorIdGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16)
+ val indexGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16)
+ forAll(processorIdGen, indexGen) { (processorId: Int, index: Int) =>
+ val taskId = TaskId(processorId, index)
+ stormTaskIdToGearpump(gearpumpTaskIdToStorm(taskId)) shouldBe taskId
+ }
+ }
+
+ property("get GearpumpStormComponent from user config") {
+ val taskContext = MockUtil.mockTaskContext
+ val topology = TopologyUtil.getTestTopology
+ implicit val actorSystem = taskContext.system
+ val userConfig = UserConfig.empty
+ .withValue[StormTopology](STORM_TOPOLOGY, topology)
+ .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef])
+ topology.get_spouts.asScala.foreach { case (spoutId, _) =>
+ val config = userConfig.withString(STORM_COMPONENT, spoutId)
+ val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
+ component shouldBe a[GearpumpSpout]
+ }
+ topology.get_bolts.asScala.foreach { case (boltId, _) =>
+ val config = userConfig.withString(STORM_COMPONENT, boltId)
+ val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
+ component shouldBe a[GearpumpBolt]
+ }
+ }
+
+ property("parse json to map") {
+ val mapGen = Gen.listOf[String](Gen.alphaStr)
+ .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]])
+
+ forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) =>
+ parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map
+ }
+
+ val invalidJsonGen: Gen[String] = Gen.oneOf(null, "", "1")
+ forAll(invalidJsonGen) { (invalidJson: String) =>
+ val map = parseJsonStringToMap(invalidJson)
+ map shouldBe empty
+ map shouldBe a[JMap[_, _]]
+ }
+ }
+
+ property("get int from config") {
+ val name = "int"
+ val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+ getInt(conf, name) shouldBe None
+ conf.put(name, null)
+ getInt(conf, name) shouldBe None
+
+ forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) { (int: Int) =>
+ conf.put(name, new Integer(int))
+ getInt(conf, name) shouldBe Some(int)
+ }
+
+ forAll(Gen.chooseNum[Long](Int.MinValue, Int.MaxValue)) { (long: Long) =>
+ conf.put(name, new JLong(long))
+ getInt(conf, name) shouldBe Some(long)
+ }
+
+ forAll(Gen.alphaStr) { (s: String) =>
+ conf.put(name, s)
+ an[IllegalArgumentException] should be thrownBy getInt(conf, name)
+ }
+ }
+
+ property("get boolean from config") {
+ val name = "boolean"
+ val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+ getBoolean(conf, name) shouldBe None
+ conf.put(name, null)
+ getBoolean(conf, name) shouldBe None
+
+ forAll(Gen.oneOf(true, false)) { (boolean: Boolean) =>
+ conf.put(name, new JBoolean(boolean))
+ getBoolean(conf, name) shouldBe Some(boolean)
+ }
+
+ forAll(Gen.alphaStr) { (s: String) =>
+ conf.put(name, s)
+ an[IllegalArgumentException] should be thrownBy getBoolean(conf, name)
+ }
+ }
+
+ property("mod should be correct") {
+ mod(10, 5) shouldBe 0
+ mod(10, 6) shouldBe 4
+ mod(10, -3) shouldBe -2
+ mod(-2, 5) shouldBe 3
+ mod(-1, -2) shouldBe -1
+ }
+
+ property("get whether ack enabled") {
+ val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+ ackEnabled(conf) shouldBe false
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(0))
+ ackEnabled(conf) shouldBe false
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, null)
+ ackEnabled(conf) shouldBe true
+ forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) {
+ (ackers: Int) =>
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(ackers))
+ if (ackers == 0) {
+ ackEnabled(conf) shouldBe false
+ } else {
+ ackEnabled(conf) shouldBe true
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala
new file mode 100644
index 0000000..886013c
--- /dev/null
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import backtype.storm.generated.StormTopology
+import backtype.storm.testing.{TestGlobalCount, TestWordCounter, TestWordSpout}
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.tuple.Fields
+import backtype.storm.utils.Utils
+
+object TopologyUtil {
+ val DEFAULT_STREAM_ID = Utils.DEFAULT_STREAM_ID
+ val DEFAULT_COMPONENT_ID = "component"
+
+ def getTestTopology: StormTopology = {
+ val topologyBuilder = new TopologyBuilder
+ topologyBuilder.setSpout("1", new TestWordSpout(true), 5)
+ topologyBuilder.setSpout("2", new TestWordSpout(true), 3)
+ topologyBuilder.setBolt("3", new TestWordCounter(), 3)
+ .fieldsGrouping("1", new Fields("word"))
+ .fieldsGrouping("2", new Fields("word"))
+ topologyBuilder.setBolt("4", new TestGlobalCount()).globalGrouping("1")
+ topologyBuilder.createTopology()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
deleted file mode 100644
index 6618b48..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn
-
-object Constants {
- val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name"
- val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command"
- val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory"
- val APPMASTER_VCORES = "gearpump.yarn.applicationmaster.vcores"
- val APPMASTER_QUEUE = "gearpump.yarn.applicationmaster.queue"
-
- val PACKAGE_PATH = "gearpump.yarn.client.package-path"
- val CONFIG_PATH = "gearpump.yarn.client.config-path"
-
- val MASTER_COMMAND = "gearpump.yarn.master.command"
- val MASTER_MEMORY = "gearpump.yarn.master.memory"
- val MASTER_VCORES = "gearpump.yarn.master.vcores"
-
- val WORKER_COMMAND = "gearpump.yarn.worker.command"
- val WORKER_CONTAINERS = "gearpump.yarn.worker.containers"
- val WORKER_MEMORY = "gearpump.yarn.worker.memory"
- val WORKER_VCORES = "gearpump.yarn.worker.vcores"
-
- val SERVICES_ENABLED = "gearpump.yarn.services.enabled"
-
- val LOCAL_DIRS = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.LOCAL_DIRS.$$()
- val CONTAINER_ID = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.$$()
- val LOG_DIR_EXPANSION_VAR = org.apache.hadoop.yarn.api.ApplicationConstants.LOG_DIR_EXPANSION_VAR
- val NODEMANAGER_HOST = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.$$()
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
deleted file mode 100644
index af871ab..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.main.{Master, Worker}
-import io.gearpump.experiments.yarn.Constants._
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Constants
-
-/** Command to start a YARN container */
-trait Command {
- def get: String
- override def toString: String = get
-}
-
-abstract class AbstractCommand extends Command {
- protected def config: Config
- def version: String
- def classPath: Array[String] = {
- Array(
- s"conf",
- s"pack/$version/conf",
- s"pack/$version/lib/daemon/*",
- s"pack/$version/lib/*"
- )
- }
-
- protected def buildCommand(
- java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String])
- : String = {
- val exe = config.getString(java)
-
- s"$exe -cp ${classPath.mkString(":")}:" +
- "$CLASSPATH " + properties.mkString(" ") +
- s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
- }
-
- protected def clazz(any: AnyRef): String = {
- val name = any.getClass.getName
- if (name.endsWith("$")) {
- name.dropRight(1)
- } else {
- name
- }
- }
-}
-
-case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
- extends AbstractCommand {
-
- def get: String = {
- val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}")
-
- val properties = Array(
- s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
- s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}",
- s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
- s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
- s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
- s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}")
-
- buildCommand(MASTER_COMMAND, properties, clazz(Master), masterArguments)
- }
-}
-
-case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String)
- extends AbstractCommand {
-
- def get: String = {
- val properties = Array(
- s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
- s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
- s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
- s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
- s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
- s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost")
-
- buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String])
- }
-}
-
-case class AppMasterCommand(config: Config, version: String, args: Array[String])
- extends AbstractCommand {
-
- override val classPath = Array(
- "conf",
- s"pack/$version/conf",
- s"pack/$version/dashboard",
- s"pack/$version/lib/*",
- s"pack/$version/lib/daemon/*",
- s"pack/$version/lib/services/*",
- s"pack/$version/lib/yarn/*"
- )
-
- def get: String = {
- val properties = Array(
- s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
- s"-D${Constants.GEARPUMP_FULL_SCALA_VERSION}=$version",
- s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
- s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
- s"-D${Constants.GEARPUMP_HOSTNAME}=${NODEMANAGER_HOST}")
-
- val arguments = Array(s"") ++ args
-
- buildCommand(APPMASTER_COMMAND, properties, clazz(YarnAppMaster),
- arguments)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
deleted file mode 100644
index 6dd5011..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import scala.concurrent.Future
-
-import akka.actor._
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.services.main.Services
-import io.gearpump.transport.HostPort
-import io.gearpump.util.{ActorUtil, Constants, LogUtil}
-
-trait UIFactory {
- def props(masters: List[HostPort], host: String, port: Int): Props
-}
-
-/** Wrapper of UI server */
-class UIService(masters: List[HostPort], host: String, port: Int) extends Actor {
- private val LOG = LogUtil.getLogger(getClass)
-
- private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path)
- private var configFile: java.io.File = null
-
- private implicit val dispatcher = context.dispatcher
-
- override def postStop(): Unit = {
- if (configFile != null) {
- configFile.delete()
- configFile = null
- }
-
- // TODO: fix this
- // Hack around to Kill the UI server
- Services.kill()
- }
-
- override def preStart(): Unit = {
- Future(start())
- }
-
- def start(): Unit = {
- val mastersArg = masters.mkString(",")
- LOG.info(s"Launching services -master $mastersArg")
-
- configFile = java.io.File.createTempFile("uiserver", ".conf")
-
- val config = context.system.settings.config.
- withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)).
- withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)).
- withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host))
-
- ClusterConfig.saveConfig(config, configFile)
-
- val master = masters.head
-
- ConfigFactory.invalidateCaches()
- launch(supervisor, master.host, master.port, configFile.toString)
- }
-
- // Launch the UI server
- def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
- Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort"
- , "-conf", configFile))
- }
-
- override def receive: Receive = {
- case _ =>
- LOG.error(s"Unknown message received")
- }
-}
-
-object UIService extends UIFactory {
- override def props(masters: List[HostPort], host: String, port: Int): Props = {
- Props(new UIService(masters, host, port))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
deleted file mode 100644
index 1df7fb9..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import java.io.IOException
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.util.Timeout
-import com.typesafe.config.ConfigValueFactory
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.experiments.yarn.Constants._
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
-import io.gearpump.transport.HostPort
-import io.gearpump.util._
-
-/**
- * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as
- * YARN containers.
- *
- * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker.
- */
-class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
- packagePath: String, hdfsConfDir: String,
- uiFactory: UIFactory)
- extends Actor {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private val akkaConf = context.system.settings.config
- private val servicesEnabled = akkaConf.getString(SERVICES_ENABLED).toBoolean
- private var uiStarted = false
- private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
-
- private val port = Util.findFreePort().get
-
- private val trackingURL = "http://" + host + ":" + port
-
- // TODO: for now, only one master is supported.
- private val masterCount = 1
- private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt
- private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt
-
- private var workerCount = akkaConf.getString(WORKER_CONTAINERS).toInt
- private val workerMemory = akkaConf.getString(WORKER_MEMORY).toInt
- private val workerVCores = akkaConf.getString(WORKER_VCORES).toInt
-
- val rootPath = System.getProperty(Constants.GEARPUMP_FULL_SCALA_VERSION)
-
- rmClient.start(self)
- nmClient.start(self)
-
- def receive: Receive = null
-
- private def registerAppMaster(): Unit = {
- val target = host + ":" + port
- rmClient.registerAppMaster(host, port, trackingURL)
- }
-
- registerAppMaster
- context.become(waitForAppMasterRegistered)
-
- import io.gearpump.experiments.yarn.appmaster.YarnAppMaster._
-
- def waitForAppMasterRegistered: Receive = {
- case AppMasterRegistered =>
- LOG.info("YarnAppMaster registration completed")
- requestMasterContainers(masterCount)
- context.become(startingMasters(remain = masterCount, List.empty[MasterInfo]))
- }
-
- private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box {
- case ContainersAllocated(containers) =>
- LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: "
- + containers.size)
- val count = Math.min(containers.length, remain)
- val newMasters = (0 until count).toList.map { index =>
- val container = containers(index)
- MasterInfo(container.getId, container.getNodeId, launchMaster(container))
- }
-
- // Stops un-used containers
- containers.drop(count).map { container =>
- nmClient.stopContainer(container.getId, container.getNodeId)
- }
-
- context.become(startingMasters(remain, newMasters ++ masters))
- case ContainerStarted(containerId) =>
- LOG.info(s"ContainerStarted: container ${containerId} started for master(remain=$remain) ")
- if (remain > 1) {
- context.become(startingMasters(remain - 1, masters))
- } else {
- requestWorkerContainers(workerCount)
- context.become(startingWorkers(workerCount, masters, List.empty[WorkerInfo]))
- }
- }
-
- private def box(receive: Receive): Receive = {
- onError orElse receive orElse unHandled
- }
-
- private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo])
- : Receive = {
- box {
- case ContainersAllocated(containers) =>
- LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " +
- s"count: " + containers.size)
-
- val count = Math.min(containers.length, remain)
- val newWorkers = (0 until count).toList.map { index =>
- val container = containers(index)
- launchWorker(container, masters)
- WorkerInfo(container.getId, container.getNodeId)
- }
-
- // Stops un-used containers
- containers.drop(count).map { container =>
- nmClient.stopContainer(container.getId, container.getNodeId)
- }
- context.become(startingWorkers(remain, masters, workers ++ newWorkers))
- case ContainerStarted(containerId) =>
- LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)")
- // The last one
- if (remain > 1) {
- context.become(startingWorkers(remain - 1, masters, workers))
- } else {
- if (servicesEnabled && !uiStarted) {
- context.actorOf(uiFactory.props(masters.map(_.host), host, port))
- uiStarted = true
- }
- context.become(service(effectiveConfig(masters.map(_.host)), masters, workers))
- }
- }
- }
-
- private def effectiveConfig(masters: List[HostPort]): Config = {
- val masterList = masters.map(pair => s"${pair.host}:${pair.port}")
- val config = context.system.settings.config
- config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
- ConfigValueFactory.fromIterable(masterList.asJava))
- }
-
- private def onError: Receive = {
- case ContainersCompleted(containers) =>
- // TODO: we should recover the failed container from this...
- containers.foreach { status =>
- if (status.getExitStatus != 0) {
- LOG.error(s"ContainersCompleted: container ${status.getContainerId}" +
- s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
- } else {
- LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed")
- }
- }
- case ShutdownApplication =>
- LOG.error("ShutdownApplication")
- nmClient.stop()
- rmClient.shutdownApplication()
- context.stop(self)
- case ResourceManagerException(ex) =>
- LOG.error("ResourceManagerException: " + ex.getMessage, ex)
- nmClient.stop()
- rmClient.failApplication(ex)
- context.stop(self)
- case Kill =>
- LOG.info("Kill: User asked to shutdown the application")
- sender ! CommandResult(success = true)
- self ! ShutdownApplication
- }
-
- private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo])
- : Receive = box {
- case GetActiveConfig(clientHost) =>
- LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost)
- val filtered = ClusterConfig.filterOutDefaultConfig(
- config.withValue(Constants.GEARPUMP_HOSTNAME,
- ConfigValueFactory.fromAnyRef(clientHost)))
- sender ! ActiveConfig(filtered)
- case QueryVersion =>
- LOG.info("QueryVersion")
- sender ! Version(Util.version)
- case QueryClusterInfo =>
- LOG.info("QueryClusterInfo")
- val masterContainers = masters.map { master =>
- master.id.toString + s"(${master.nodeId.toString})"
- }
-
- val workerContainers = workers.map { worker =>
- worker.id.toString + s"(${worker.nodeId.toString})"
- }
- sender ! ClusterInfo(masterContainers, workerContainers)
- case AddMaster =>
- sender ! CommandResult(success = false, "Not Implemented")
- case RemoveMaster(masterId) =>
- sender ! CommandResult(success = false, "Not Implemented")
- case AddWorker(count) =>
- if (count == 0) {
- sender ! CommandResult(success = true)
- } else {
- LOG.info("AddWorker: Start to add new workers, count: " + count)
- workerCount += count
- requestWorkerContainers(count)
- context.become(startingWorkers(count, masters, workers))
- sender ! CommandResult(success = true)
- }
- case RemoveWorker(worker) =>
- val workerId = ContainerId.fromString(worker)
- LOG.info(s"RemoveWorker: remove worker $workerId")
- val info = workers.find(_.id.toString == workerId.toString)
- if (info.isDefined) {
- nmClient.stopContainer(info.get.id, info.get.nodeId)
- sender ! CommandResult(success = true)
- val remainWorkers = workers.filter(_.id != info.get.id)
- context.become(service(config, masters, remainWorkers))
- } else {
- sender ! CommandResult(success = false, "failed to find worker " + worker)
- }
- }
-
- private def unHandled: Receive = {
- case other =>
- LOG.info(s"Received unknown message $other")
- }
-
- private def requestMasterContainers(masters: Int) = {
- LOG.info(s"Request resource for masters($masters)")
- val containers = (1 to masters).map(
- i => Resource.newInstance(masterMemory, masterVCores)
- ).toList
- rmClient.requestContainers(containers)
- }
-
- private def launchMaster(container: Container): HostPort = {
- LOG.info(s"Launch Master on container " + container.getNodeHttpAddress)
- val host = container.getNodeId.getHost
-
- val port = Util.findFreePort().get
-
- LOG.info("=============PORT" + port)
- val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port))
- nmClient.launchCommand(container, masterCommand.get, packagePath, hdfsConfDir)
- HostPort(host, port)
- }
-
- private def requestWorkerContainers(workers: Int): Unit = {
- LOG.info(s"Request resource for workers($workers)")
- val containers = (1 to workers).map(
- i => Resource.newInstance(workerMemory, workerVCores)
- ).toList
-
- rmClient.requestContainers(containers)
- }
-
- private def launchWorker(container: Container, masters: List[MasterInfo]): Unit = {
- LOG.info(s"Launch Worker on container " + container.getNodeHttpAddress)
- val workerHost = container.getNodeId.getHost
- val workerCommand = WorkerCommand(akkaConf, rootPath, masters.head.host, workerHost)
- nmClient.launchCommand(container, workerCommand.get, packagePath, hdfsConfDir)
- }
-}
-
-object YarnAppMaster extends AkkaApp with ArgumentsParser {
- val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "conf" -> CLIOption[String]("<Gearpump configuration directory on HDFS>", required = true),
- "package" -> CLIOption[String]("<Gearpump package path on HDFS>", required = true)
- )
-
- override def akkaConfig: Config = {
- ClusterConfig.ui()
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- implicit val timeout = Timeout(5, TimeUnit.SECONDS)
- implicit val system = ActorSystem("GearpumpAM", akkaConf)
-
- val yarnConf = new YarnConfig()
-
- val confDir = parse(args).getString("conf")
- val packagePath = parse(args).getString("package")
-
- LOG.info("HADOOP_CONF_DIR: " + System.getenv("HADOOP_CONF_DIR"))
- LOG.info("YARN Resource Manager: " + yarnConf.resourceManager)
-
- val rmClient = new RMClient(yarnConf)
- val nmClient = new NMClient(yarnConf, akkaConf)
- val appMaster = system.actorOf(Props(new YarnAppMaster(rmClient,
- nmClient, packagePath, confDir, UIService)))
-
- val daemon = system.actorOf(Props(new Daemon(appMaster)))
- Await.result(system.whenTerminated, Duration.Inf)
- LOG.info("YarnAppMaster is shutdown")
- }
-
- class Daemon(appMaster: ActorRef) extends Actor {
- context.watch(appMaster)
-
- override def receive: Actor.Receive = {
- case Terminated(actor) =>
- if (actor.compareTo(appMaster) == 0) {
- LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " +
- s"shutting down current ActorSystem")
- context.system.terminate()
- context.stop(self)
- }
- }
- }
-
- case class ResourceManagerException(throwable: Throwable)
- case object ShutdownApplication
- case class ContainersRequest(containers: List[Resource])
- case class ContainersAllocated(containers: List[Container])
- case class ContainersCompleted(containers: List[ContainerStatus])
- case class ContainerStarted(containerId: ContainerId)
- case object AppMasterRegistered
-
- case class GetActiveConfig(clientHost: String)
-
- case object QueryClusterInfo
- case class ClusterInfo(masters: List[String], workers: List[String]) {
- override def toString: String = {
- val separator = "\n"
- val masterSection = "masters: " + separator + masters.mkString("\n") + "\n"
-
- val workerSection = "workers: " + separator + workers.mkString("\n") + "\n"
- masterSection + workerSection
- }
- }
-
- case object Kill
- case class ActiveConfig(config: Config)
-
- case object QueryVersion
-
- case class Version(version: String)
-
- case class MasterInfo(id: ContainerId, nodeId: NodeId, host: HostPort)
-
- case class WorkerInfo(id: ContainerId, nodeId: NodeId)
-
- def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
- val client = new HttpClient()
- val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
- val get = new GetMethod(appMasterPath)
- var status = client.executeMethod(get)
-
- if (status != 200) {
- // Sleeps a little bit, and try again
- Thread.sleep(3000)
- status = client.executeMethod(get)
- }
-
- if (status == 200) {
- AkkaHelper.actorFor(system, get.getResponseBodyAsString)
- } else {
- throw new IOException("Fail to resolve AppMaster address, please make sure " +
- s"${report.getOriginalTrackingUrl} is accessible...")
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
deleted file mode 100644
index 49ec3a0..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import java.io.IOException
-import scala.util.Try
-
-import akka.actor.{ActorRef, ActorSystem}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.experiments.yarn.glue.YarnClient
-import io.gearpump.util.{AkkaHelper, LogUtil}
-
-/**
- * Resolves AppMaster ActorRef
- */
-class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
- val LOG = LogUtil.getLogger(getClass)
- val RETRY_INTERVAL_MS = 3000 // ms
-
- def resolve(appId: ApplicationId, timeoutSeconds: Int = 30): ActorRef = {
- val appMaster = retry(connect(appId), 1 + timeoutSeconds * 1000 / RETRY_INTERVAL_MS)
- appMaster
- }
-
- private def connect(appId: ApplicationId): ActorRef = {
- val report = yarnClient.getApplicationReport(appId)
- val client = new HttpClient()
- val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
- LOG.info(s"appMasterPath=$appMasterPath")
- val get = new GetMethod(appMasterPath)
- val status = client.executeMethod(get)
- if (status == 200) {
- val response = get.getResponseBodyAsString
- LOG.info("Successfully resolved AppMaster address: " + response)
- AkkaHelper.actorFor(system, response)
- } else {
- throw new IOException("Fail to resolve AppMaster address, please make sure " +
- s"${report.getOriginalTrackingUrl} is accessible...")
- }
- }
-
- private def retry(fun: => ActorRef, times: Int): ActorRef = {
- var index = 0
- var result: ActorRef = null
- while (index < times && result == null) {
- Thread.sleep(RETRY_INTERVAL_MS)
- index += 1
- val tryConnect = Try(fun)
- if (tryConnect.isFailure) {
- LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " +
- tryConnect.failed.get.getMessage)
- } else {
- result = tryConnect.get
- }
- }
- result
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
deleted file mode 100644
index 7c3bc38..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.yarn.client
-import org.slf4j.Logger
-
-import io.gearpump.util.LogUtil
-
-/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */
-object Client {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
- val LAUNCH = "launch"
-
- val commands = Map(LAUNCH -> LaunchCluster) ++
- ManageCluster.commands.map(key => (key, ManageCluster)).toMap
-
- def usage(): Unit = {
- val keys = commands.keys.toList.sorted
- // scalastyle:off println
- Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
- // scalastyle:on println
- }
-
- def main(args: Array[String]): Unit = {
- if (args.length == 0) {
- usage()
- } else {
- val key = args(0)
- val command = commands.get(key)
- command match {
- case Some(command) =>
- if (key == LAUNCH) {
- val remainArgs = args.drop(1)
- command.main(remainArgs)
- } else {
- val commandArg = Array("-" + ManageCluster.COMMAND, key)
- val remainArgs = args.drop(1)
- val updatedArgs = commandArg ++ args.drop(1)
- command.main(updatedArgs)
- }
- case None =>
- usage
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
deleted file mode 100644
index ea5e707..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.yarn.client
-
-import java.io.{File, IOException, OutputStreamWriter}
-import java.net.InetAddress
-import java.util.zip.ZipInputStream
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorSystem
-import com.typesafe.config.{Config, ConfigValueFactory}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.experiments.yarn
-import io.gearpump.experiments.yarn.Constants
-import io.gearpump.experiments.yarn.appmaster.AppMasterCommand
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
-import io.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource}
-import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
-import io.gearpump.util.ActorUtil.askActor
-import io.gearpump.util.{AkkaApp, LogUtil, Util}
-
-/**
- * Launch Gearpump on YARN
- */
-class LaunchCluster(
- akka: Config,
- yarnConf: YarnConfig,
- yarnClient: YarnClient,
- fs: FileSystem,
- actorSystem: ActorSystem,
- appMasterResolver: AppMasterResolver,
- version: String = Util.version) {
-
- import io.gearpump.experiments.yarn.Constants._
- private implicit val dispatcher = actorSystem.dispatcher
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private val host = InetAddress.getLocalHost.getHostName
- private val queue = akka.getString(APPMASTER_QUEUE)
- private val memory = akka.getString(APPMASTER_MEMORY).toInt
- private val vcore = akka.getString(APPMASTER_VCORES).toInt
-
- def submit(appName: String, packagePath: String): ApplicationId = {
- LOG.info("Starting AM")
-
- // First step, check the version, to make sure local version matches remote version
- if (!packagePath.endsWith(".zip")) {
- throw new IOException(s"YarnClient only support .zip distribution package," +
- s" now it is ${packagePath}. Please download the zip " +
- "package from website or use sbt assembly packArchiveZip to build one.")
- }
-
- if (!fs.exists(packagePath)) {
- throw new IOException(s"Cannot find package ${packagePath} on HDFS ${fs.name}. ")
- }
-
- val rootEntry = rootEntryPath(zip = packagePath)
-
- if (!rootEntry.contains(version)) {
- throw new IOException(s"Check version failed! Local gearpump binary" +
- s" version $version doesn't match with remote path $packagePath")
- }
-
- val resource = Resource.newInstance(memory, vcore)
- val appId = yarnClient.createApplication
-
- // uploads the configs to HDFS home directory of current user.
- val configPath = uploadConfigToHDFS(appId)
-
- val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath",
- s"-package $packagePath"))
-
- yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath)
-
- LOG.info("Waiting application to finish...")
- val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS)
- LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " +
- s"at ${report.getFinishTime}, info: ${report.getDiagnostics}")
-
- // scalastyle:off println
- Console.println("================================================")
- Console.println("==Application Id: " + appId)
- // scalastyle:on println
- appId
- }
-
- def saveConfig(appId: ApplicationId, output: String): Future[File] = {
- LOG.info(s"Trying to download active configuration to output path: " + output)
- LOG.info(s"Resolving YarnAppMaster ActorRef for application " + appId)
- val appMaster = appMasterResolver.resolve(appId)
- LOG.info(s"appMaster=${appMaster.path} host=$host")
- val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config)
- future.map { config =>
- val out = new File(output)
- ClusterConfig.saveConfig(config, out)
- out
- }
- }
-
- private def uploadConfigToHDFS(appId: ApplicationId): String = {
- // Uses personal home directory so that it will not conflict with other users
- // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf
- val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/"
- LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...")
-
- // Copies config from local to remote.
- val remoteConfFile = s"$confDir/gear.conf"
- var out = fs.create(remoteConfFile)
- var writer = new OutputStreamWriter(out)
-
- val cleanedConfig = ClusterConfig.filterOutDefaultConfig(akka)
-
- writer.write(cleanedConfig.root().render())
- writer.close()
-
- // Saves yarn-site.xml to remote
- val yarn_site_xml = s"$confDir/yarn-site.xml"
- out = fs.create(yarn_site_xml)
- writer = new OutputStreamWriter(out)
- yarnConf.writeXml(writer)
- writer.close()
-
- // Saves log4j.properties to remote
- val log4j_properties = s"$confDir/log4j.properties"
- val log4j = LogUtil.loadConfiguration
- out = fs.create(log4j_properties)
- writer = new OutputStreamWriter(out)
- log4j.store(writer, "gearpump on yarn")
- writer.close()
- confDir.toString
- }
-
- private def rootEntryPath(zip: String): String = {
- val stream = new ZipInputStream(fs.open(zip))
- val entry = stream.getNextEntry()
- val name = entry.getName
- name.substring(0, entry.getName.indexOf("/"))
- }
-}
-
-object LaunchCluster extends AkkaApp with ArgumentsParser {
-
- val PACKAGE = "package"
- val NAME = "name"
- val VERBOSE = "verbose"
- val OUTPUT = "output"
-
- override protected def akkaConfig: Config = {
- ClusterConfig.default()
- }
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " +
- "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
- NAME -> CLIOption[String]("<Application name showed in YARN>", required = false,
- defaultValue = Some("Gearpump")),
- VERBOSE -> CLIOption("<print verbose log on console>", required = false,
- defaultValue = Some(false)),
- OUTPUT -> CLIOption("<output path for configuration file>", required = false,
- defaultValue = None)
- )
- private val TIMEOUT_MILLISECONDS = 30 * 1000
-
- override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
- val parsed = parse(args)
- if (parsed.getBoolean(VERBOSE)) {
- LogUtil.verboseLogToConsole()
- }
-
- val yarnConfig = new YarnConfig()
- val fs = new FileSystem(yarnConfig)
- val yarnClient = new YarnClient(yarnConfig)
- val akkaConf = updateConf(inputAkkaConf, parsed)
- val actorSystem = ActorSystem("launchCluster", akkaConf)
- val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem)
-
- val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs,
- actorSystem, appMasterResolver)
-
- val name = parsed.getString(NAME)
- val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH))
-
- if (parsed.exists(OUTPUT)) {
- import scala.concurrent.duration._
- Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)),
- TIMEOUT_MILLISECONDS.milliseconds)
- }
-
- yarnClient.stop()
- actorSystem.terminate()
- Await.result(actorSystem.whenTerminated, Duration.Inf)
- }
-
- private def updateConf(akka: Config, parsed: ParseResult): Config = {
- if (parsed.exists(PACKAGE)) {
- akka.withValue(Constants.PACKAGE_PATH,
- ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
- } else {
- akka
- }
- }
-}
\ No newline at end of file