You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:45 UTC
[35/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index eb2bc6e..319ee27 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,25 +17,28 @@
*/
package io.gearpump.experiments.storm.topology
-import akka.actor.ActorRef
-import backtype.storm.spout.{SpoutOutputCollector, ISpout}
-import backtype.storm.task.{OutputCollector, IBolt, GeneralTopologyContext, TopologyContext}
import java.util.{Map => JMap}
+
+import akka.actor.ActorRef
+import backtype.storm.spout.{ISpout, SpoutOutputCollector}
+import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
import backtype.storm.tuple.Tuple
-import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector
-import io.gearpump.{TimeStamp, Message}
-import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
-import io.gearpump.experiments.storm.util.StormOutputCollector
-import io.gearpump.streaming.{MockUtil, DAG}
-import io.gearpump.streaming.task.{TaskContext, StartTime, TaskId}
+import org.mockito.Matchers.{anyObject, eq => mockitoEq}
import org.mockito.Mockito._
-import org.mockito.Matchers.{anyLong, anyObject, eq => mockitoEq}
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
-import org.scalatest.{PropSpec, Matchers}
+import org.scalatest.{Matchers, PropSpec}
-class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector
+import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import io.gearpump.experiments.storm.util.StormOutputCollector
+import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
+import io.gearpump.streaming.{DAG, MockUtil}
+import io.gearpump.{Message, TimeStamp}
+
+class GearpumpStormComponentSpec
+ extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
property("GearpumpSpout lifecycle") {
val config = mock[JMap[AnyRef, AnyRef]]
@@ -56,13 +59,14 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match
val gearpumpSpout = GearpumpSpout(config, spout, getDAG, getTopologyContext,
getOutputCollector, ackEnabled = false, taskContext)
- // start
+ // Start
val startTime = mock[StartTime]
gearpumpSpout.start(startTime)
- verify(spout).open(mockitoEq(config), mockitoEq(topologyContext), anyObject[SpoutOutputCollector])
+ verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
+ anyObject[SpoutOutputCollector])
- // next
+ // Next
val message = mock[Message]
gearpumpSpout.next(message)
@@ -93,16 +97,17 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match
val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple]
val tickTuple = mock[Tuple]
when(getTickTuple(mockitoEq(generalTopologyContext), anyObject[Int]())).thenReturn(tickTuple)
- val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext, getGeneralTopologyContext,
- getOutputCollector, getTickTuple, taskContext)
+ val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext,
+ getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext)
- // start
+ // Start
val startTime = mock[StartTime]
gearpumpBolt.start(startTime)
- verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext), anyObject[OutputCollector])
+ verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
+ anyObject[OutputCollector])
- // next
+ // Next
val gearpumpTuple = mock[GearpumpTuple]
val tuple = mock[Tuple]
when(gearpumpTuple.toTuple(generalTopologyContext, timestamp)).thenReturn(tuple)
@@ -112,11 +117,9 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match
verify(stormOutputCollector).setTimestamp(timestamp)
verify(bolt).execute(tuple)
-
- // tick
+ // Tick
gearpumpBolt.tick(freq)
verify(bolt).execute(tickTuple)
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index 8f10886..f9ffc5f 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,16 @@
package io.gearpump.experiments.storm.topology
import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.collection.JavaConverters._
import backtype.storm.Config
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
import io.gearpump.experiments.storm.processor.StormProcessor
import io.gearpump.experiments.storm.producer.StormProducer
import io.gearpump.experiments.storm.util.TopologyUtil
import io.gearpump.streaming.MockUtil
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.collection.JavaConversions._
class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar {
import io.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._
@@ -43,18 +43,19 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
val appConfig = newJavaConfig(name, appVal)
implicit val system = MockUtil.system
- val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, newEmptyConfig)
- topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1"
+ val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig,
+ newEmptyConfig)
+ topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1"
topology1.getStormConfig should not contain name
- val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, newEmptyConfig)
- topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2"
+ val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig,
+ newEmptyConfig)
+ topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2"
topology2.getStormConfig.get(name) shouldBe sysVal
val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig)
- topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3"
+ topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3"
topology3.getStormConfig.get(name) shouldBe appVal
-
}
"create Gearpump processors from Storm topology" in {
@@ -63,12 +64,12 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
val gearpumpStormTopology =
GearpumpStormTopology("app", stormTopology, null)
val processors = gearpumpStormTopology.getProcessors
- stormTopology.get_spouts().foreach { case (spoutId, _) =>
+ stormTopology.get_spouts().asScala.foreach { case (spoutId, _) =>
val processor = processors(spoutId)
processor.taskClass shouldBe classOf[StormProducer]
processor.description shouldBe spoutId
}
- stormTopology.get_bolts().foreach { case (boltId, _) =>
+ stormTopology.get_bolts().asScala.foreach { case (boltId, _) =>
val processor = processors(boltId)
processor.taskClass shouldBe classOf[StormProcessor]
processor.description shouldBe boltId
@@ -88,7 +89,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar
targets1 should contain key "3"
}
}
-
}
object GearpumpStormTopologySpec {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
index 4ccdd39..f454145 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,17 @@
package io.gearpump.experiments.storm.topology
import java.util.{List => JList}
+import scala.collection.JavaConverters._
import backtype.storm.task.GeneralTopologyContext
import backtype.storm.tuple.Fields
-import io.gearpump.TimeStamp
import org.mockito.Mockito._
import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import org.scalatest.{Matchers, PropSpec}
+
+import io.gearpump.TimeStamp
class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -42,14 +42,14 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with
forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
(gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) =>
val topologyContext = mock[GeneralTopologyContext]
- val fields = new Fields(gearpumpTuple.values.map(_.asInstanceOf[String]): _*)
+ val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)
when(topologyContext.getComponentOutputFields(
componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields)
val tuple = gearpumpTuple.toTuple(topologyContext, timestamp)
- tuple shouldBe a [TimedTuple]
+ tuple shouldBe a[TimedTuple]
val timedTuple = tuple.asInstanceOf[TimedTuple]
timedTuple.getValues shouldBe gearpumpTuple.values
timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId
@@ -60,5 +60,4 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with
timedTuple.timestamp shouldBe timestamp
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
index 4efe06c..27c02fe 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,14 @@
package io.gearpump.experiments.storm.util
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
import io.gearpump.experiments.storm.partitioner.StormPartitioner
import io.gearpump.experiments.storm.topology.GearpumpStormTopology
import io.gearpump.streaming.Processor
import io.gearpump.streaming.task.Task
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
@@ -47,7 +48,7 @@ class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
graph.edges.size shouldBe 1
val (from, edge, to) = graph.edges.head
from shouldBe sourceProcessor
- edge shouldBe a [StormPartitioner]
+ edge shouldBe a[StormPartitioner]
to shouldBe targetProcessor
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
index 7566f2f..599ea8d 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,19 @@
package io.gearpump.experiments.storm.util
import java.util.{List => JList}
+import scala.collection.JavaConverters._
+
import backtype.storm.generated.GlobalStreamId
import backtype.storm.grouping.CustomStreamGrouping
import backtype.storm.task.TopologyContext
import backtype.storm.tuple.Fields
-import io.gearpump.experiments.storm.util.GrouperSpec.Value
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, PropSpec}
import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
-import scala.collection.JavaConverters._
+import io.gearpump.experiments.storm.util.GrouperSpec.Value
class GrouperSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -77,7 +78,7 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit
val grouper = new FieldsGrouper(outFields, groupFields, numTasks)
grouper.getPartitions(taskId,
values.toList.asJava.asInstanceOf[JList[AnyRef]])
- }.distinct.size shouldBe numTasks
+ }.distinct.size shouldBe numTasks
}
}
@@ -103,8 +104,8 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit
verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks)
- forAll(taskIdGen, valuesGen, numTasksGen) { (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
- 0.until(numTasks).foreach { i =>
+ forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: JList[AnyRef], taskNum: Int) =>
+ 0.until(taskNum).foreach { i =>
when(grouping.chooseTasks(taskId, values)).thenReturn(List(new Integer(i)).asJava)
grouper.getPartitions(taskId, values) shouldBe List(i)
}
@@ -114,7 +115,17 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit
object GrouperSpec {
class Value(val i: Int) extends AnyRef {
+
override def toString: String = s"$i"
+
override def hashCode(): Int = i
+
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[Value]) {
+ this.i == other.asInstanceOf[Value].i
+ } else {
+ false
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index c75e92c..1a28694 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,21 +17,23 @@
*/
package io.gearpump.experiments.storm.util
-import java.util.{Map => JMap, List => JList}
+import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+
import backtype.storm.generated.Grouping
-import io.gearpump._
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.streaming.MockUtil
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, PropSpec}
import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
-import scala.collection.JavaConverters._
+import io.gearpump._
+import io.gearpump.experiments.storm.topology.GearpumpTuple
+import io.gearpump.streaming.MockUtil
-class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+class StormOutputCollectorSpec
+ extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
val stormTaskId = 0
val streamIdGen = Gen.alphaStr
@@ -43,13 +45,15 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
(timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
val targets = mock[JMap[String, JMap[String, Grouping]]]
val taskToComponent = mock[JMap[Integer, String]]
- val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])]
+ val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+ (Map[String, Array[Int]], JList[Integer])]
val targetPartitions = mock[Map[String, Array[Int]]]
val targetStormTaskIds = mock[JList[Integer]]
- when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds))
+ when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+ targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
- val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
- taskContext, LatestTime)
+ val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+ targets, getTargetPartitionsFn, taskContext, LatestTime)
when(targets.containsKey(streamId)).thenReturn(false)
stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
@@ -59,12 +63,11 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
stormOutputCollector.setTimestamp(timestamp)
stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
- case Message(tuple: GearpumpTuple, t) =>
- val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
- tuple == expected && t == timestamp
+ case Message(tuple: GearpumpTuple, t) =>
+ val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
+ tuple == expected && t == timestamp
}))
}
-
}
property("StormOutputCollector emit direct to a task") {
@@ -75,13 +78,15 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
val targets = mock[JMap[String, JMap[String, Grouping]]]
val taskToComponent = mock[JMap[Integer, String]]
when(taskToComponent.get(id)).thenReturn(target)
- val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])]
+ val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
+ (Map[String, Array[Int]], JList[Integer])]
val targetPartitions = mock[Map[String, Array[Int]]]
val targetStormTaskIds = mock[JList[Integer]]
- when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds))
+ when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
+ targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
- val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
- taskContext, LatestTime)
+ val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
+ targets, getTargetPartitionsFn, taskContext, LatestTime)
when(targets.containsKey(streamId)).thenReturn(false)
verify(taskContext, times(0)).output(anyObject[Message])
@@ -91,11 +96,14 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
stormOutputCollector.emitDirect(id, streamId, values)
val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
- case Message(tuple: GearpumpTuple, t) =>
- val expected = new GearpumpTuple(values, stormTaskId, streamId, Map(target -> partitions))
- tuple == expected && t == timestamp
+ case Message(tuple: GearpumpTuple, t) => {
+ val expected = new GearpumpTuple(values, stormTaskId, streamId,
+ Map(target -> partitions))
+
+ val result = tuple == expected && t == timestamp
+ result
+ }
}))
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
index f42b920..d8a29e8 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,20 +19,20 @@
package io.gearpump.experiments.storm.util
import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+import scala.collection.JavaConverters._
import akka.actor.ExtendedActorSystem
import backtype.storm.utils.Utils
import com.esotericsoftware.kryo.Kryo
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.streaming.MockUtil
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import scala.collection.JavaConverters._
+import io.gearpump.cluster.UserConfig
+import io.gearpump.experiments.storm.topology.GearpumpTuple
+import io.gearpump.experiments.storm.util.StormConstants._
+import io.gearpump.streaming.MockUtil
class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -44,7 +44,7 @@ class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers
val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, AnyRef]]
val config = UserConfig.empty.withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, stormConfig)
serializerPool.init(system, config)
- serializerPool.get shouldBe a [StormSerializer]
+ serializerPool.get shouldBe a[StormSerializer]
}
property("StormSerializer should serialize and deserialize GearpumpTuple") {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
index 610b701..a9b93c9 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,27 +20,25 @@ package io.gearpump.experiments.storm.util
import java.lang.{Boolean => JBoolean, Long => JLong}
import java.util.{HashMap => JHashMap, Map => JMap}
+import scala.collection.JavaConverters._
import backtype.storm.Config
import backtype.storm.generated.StormTopology
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.experiments.storm.util.StormUtil._
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.TaskId
import org.apache.storm.shade.org.json.simple.JSONValue
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import io.gearpump.cluster.UserConfig
+import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
+import io.gearpump.experiments.storm.util.StormConstants._
+import io.gearpump.experiments.storm.util.StormUtil._
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.TaskId
class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
property("convert Storm task ids to gearpump TaskIds and back") {
val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
forAll(idGen) { (stormTaskId: Int) =>
@@ -60,23 +58,23 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
val topology = TopologyUtil.getTestTopology
implicit val actorSystem = taskContext.system
val userConfig = UserConfig.empty
- .withValue[StormTopology](STORM_TOPOLOGY, topology)
- .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef])
- topology.get_spouts foreach { case (spoutId, _) =>
+ .withValue[StormTopology](STORM_TOPOLOGY, topology)
+ .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef])
+ topology.get_spouts.asScala.foreach { case (spoutId, _) =>
val config = userConfig.withString(STORM_COMPONENT, spoutId)
val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
- component shouldBe a [GearpumpSpout]
+ component shouldBe a[GearpumpSpout]
}
- topology.get_bolts foreach { case (boltId, _) =>
+ topology.get_bolts.asScala.foreach { case (boltId, _) =>
val config = userConfig.withString(STORM_COMPONENT, boltId)
val component = getGearpumpStormComponent(taskContext, config)(taskContext.system)
- component shouldBe a [GearpumpBolt]
+ component shouldBe a[GearpumpBolt]
}
}
property("parse json to map") {
val mapGen = Gen.listOf[String](Gen.alphaStr)
- .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]])
+ .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]])
forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) =>
parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map
@@ -86,7 +84,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
forAll(invalidJsonGen) { (invalidJson: String) =>
val map = parseJsonStringToMap(invalidJson)
map shouldBe empty
- map shouldBe a [JMap[_, _]]
+ map shouldBe a[JMap[_, _]]
}
}
@@ -109,7 +107,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
forAll(Gen.alphaStr) { (s: String) =>
conf.put(name, s)
- an [IllegalArgumentException] should be thrownBy getInt(conf, name)
+ an[IllegalArgumentException] should be thrownBy getInt(conf, name)
}
}
@@ -127,7 +125,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock
forAll(Gen.alphaStr) { (s: String) =>
conf.put(name, s)
- an [IllegalArgumentException] should be thrownBy getBoolean(conf, name)
+ an[IllegalArgumentException] should be thrownBy getBoolean(conf, name)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
index 24cce69..8491786 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
index 989cfaa..6618b48 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
index 6c38bde..af871ab 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,11 +19,13 @@
package io.gearpump.experiments.yarn.appmaster
import com.typesafe.config.Config
+
import io.gearpump.cluster.main.{Master, Worker}
import io.gearpump.experiments.yarn.Constants._
import io.gearpump.transport.HostPort
import io.gearpump.util.Constants
+/** Command to start a YARN container */
trait Command {
def get: String
override def toString: String = get
@@ -32,22 +34,26 @@ trait Command {
abstract class AbstractCommand extends Command {
protected def config: Config
def version: String
- def classPath = Array(
- s"conf",
- s"pack/$version/conf",
- s"pack/$version/lib/daemon/*",
- s"pack/$version/lib/*"
- )
+ def classPath: Array[String] = {
+ Array(
+ s"conf",
+ s"pack/$version/conf",
+ s"pack/$version/lib/daemon/*",
+ s"pack/$version/lib/*"
+ )
+ }
- protected def buildCommand(java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String]):String = {
+ protected def buildCommand(
+ java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String])
+ : String = {
val exe = config.getString(java)
s"$exe -cp ${classPath.mkString(":")}:" +
"$CLASSPATH " + properties.mkString(" ") +
- s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
+ s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
}
- protected def clazz(any: AnyRef) : String = {
+ protected def clazz(any: AnyRef): String = {
val name = any.getClass.getName
if (name.endsWith("$")) {
name.dropRight(1)
@@ -57,10 +63,11 @@ abstract class AbstractCommand extends Command {
}
}
-case class MasterCommand(config: Config, version: String, masterAddr: HostPort) extends AbstractCommand {
+case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
+ extends AbstractCommand {
def get: String = {
- val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}")
+ val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}")
val properties = Array(
s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
@@ -74,7 +81,8 @@ case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
}
}
-case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String) extends AbstractCommand {
+case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String)
+ extends AbstractCommand {
def get: String = {
val properties = Array(
@@ -85,11 +93,12 @@ case class WorkerCommand(config: Config, version: String, masterAddr: HostPort,
s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost")
- buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String])
+ buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String])
}
}
-case class AppMasterCommand(config: Config, version: String, args: Array[String]) extends AbstractCommand {
+case class AppMasterCommand(config: Config, version: String, args: Array[String])
+ extends AbstractCommand {
override val classPath = Array(
"conf",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
index 92f85b6..6dd5011 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,30 @@
package io.gearpump.experiments.yarn.appmaster
+import scala.concurrent.Future
+
import akka.actor._
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
+
import io.gearpump.cluster.ClusterConfig
import io.gearpump.services.main.Services
import io.gearpump.transport.HostPort
import io.gearpump.util.{ActorUtil, Constants, LogUtil}
-import scala.concurrent.Future
-
trait UIFactory {
def props(masters: List[HostPort], host: String, port: Int): Props
}
+/** Wrapper of UI server */
class UIService(masters: List[HostPort], host: String, port: Int) extends Actor {
private val LOG = LogUtil.getLogger(getClass)
private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path)
private var configFile: java.io.File = null
- implicit val dispatcher = context.dispatcher
+ private implicit val dispatcher = context.dispatcher
- override def postStop: Unit = {
+ override def postStop(): Unit = {
if (configFile != null) {
configFile.delete()
configFile = null
@@ -51,22 +53,20 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor
}
override def preStart(): Unit = {
- Future(start)
+ Future(start())
}
- def start: Unit = {
+ def start(): Unit = {
val mastersArg = masters.mkString(",")
LOG.info(s"Launching services -master $mastersArg")
configFile = java.io.File.createTempFile("uiserver", ".conf")
-
val config = context.system.settings.config.
withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)).
withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)).
withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host))
-
ClusterConfig.saveConfig(config, configFile)
val master = masters.head
@@ -75,9 +75,10 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor
launch(supervisor, master.host, master.port, configFile.toString)
}
+ // Launch the UI server
def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort"
- , "-conf", configFile))
+ , "-conf", configFile))
}
override def receive: Receive = {
@@ -86,7 +87,7 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor
}
}
-object UIService extends UIFactory{
+object UIService extends UIFactory {
override def props(masters: List[HostPort], host: String, port: Int): Props = {
Props(new UIService(masters, host, port))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index f8982ce..1df7fb9 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,10 +20,17 @@ package io.gearpump.experiments.yarn.appmaster
import java.io.IOException
import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.GetMethod
+import org.slf4j.Logger
+
import io.gearpump.cluster.ClientToMaster._
import io.gearpump.cluster.ClusterConfig
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
@@ -32,13 +39,16 @@ import io.gearpump.experiments.yarn.glue.Records._
import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
import io.gearpump.transport.HostPort
import io.gearpump.util._
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.slf4j.Logger
+/**
+ * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as
+ * YARN containers.
+ *
+ * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker.
+ */
class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
- packagePath: String, hdfsConfDir: String,
- uiFactory: UIFactory)
+ packagePath: String, hdfsConfDir: String,
+ uiFactory: UIFactory)
extends Actor {
private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -47,11 +57,11 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
private var uiStarted = false
private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
- val port = Util.findFreePort.get
+ private val port = Util.findFreePort().get
private val trackingURL = "http://" + host + ":" + port
- //TODO: for now, only one master is supported.
+ // TODO: for now, only one master is supported.
private val masterCount = 1
private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt
private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt
@@ -67,7 +77,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
def receive: Receive = null
- private def registerAppMaster: Unit = {
+ private def registerAppMaster(): Unit = {
val target = host + ":" + port
rmClient.registerAppMaster(host, port, trackingURL)
}
@@ -75,7 +85,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
registerAppMaster
context.become(waitForAppMasterRegistered)
- import YarnAppMaster._
+ import io.gearpump.experiments.yarn.appmaster.YarnAppMaster._
def waitForAppMasterRegistered: Receive = {
case AppMasterRegistered =>
@@ -86,15 +96,16 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box {
case ContainersAllocated(containers) =>
- LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: " + containers.size)
+ LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: "
+ + containers.size)
val count = Math.min(containers.length, remain)
- val newMasters = (0 until count).toList.map{index =>
+ val newMasters = (0 until count).toList.map { index =>
val container = containers(index)
MasterInfo(container.getId, container.getNodeId, launchMaster(container))
}
- // stop un-used containers
- containers.drop(count).map{container =>
+ // Stops un-used containers
+ containers.drop(count).map { container =>
nmClient.stopContainer(container.getId, container.getNodeId)
}
@@ -113,26 +124,28 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
onError orElse receive orElse unHandled
}
- private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo]): Receive =
+ private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo])
+ : Receive = {
box {
case ContainersAllocated(containers) =>
- LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), count: " + containers.size)
+ LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " +
+ s"count: " + containers.size)
val count = Math.min(containers.length, remain)
- val newWorkers = (0 until count).toList.map{index =>
+ val newWorkers = (0 until count).toList.map { index =>
val container = containers(index)
launchWorker(container, masters)
WorkerInfo(container.getId, container.getNodeId)
}
- // stop un-used containers
- containers.drop(count).map{container =>
+ // Stops un-used containers
+ containers.drop(count).map { container =>
nmClient.stopContainer(container.getId, container.getNodeId)
}
context.become(startingWorkers(remain, masters, workers ++ newWorkers))
case ContainerStarted(containerId) =>
LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)")
- // the last one
+ // The last one
if (remain > 1) {
context.become(startingWorkers(remain - 1, masters, workers))
} else {
@@ -143,21 +156,22 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
context.become(service(effectiveConfig(masters.map(_.host)), masters, workers))
}
}
-
- import scala.collection.JavaConversions._
+ }
private def effectiveConfig(masters: List[HostPort]): Config = {
val masterList = masters.map(pair => s"${pair.host}:${pair.port}")
val config = context.system.settings.config
- config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(masterList))
+ config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
+ ConfigValueFactory.fromIterable(masterList.asJava))
}
private def onError: Receive = {
case ContainersCompleted(containers) =>
- //TODO: we should recover the failed container from this...
+ // TODO: we should recover the failed container from this...
containers.foreach { status =>
if (status.getExitStatus != 0) {
- LOG.error(s"ContainersCompleted: container ${status.getContainerId} failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
+ LOG.error(s"ContainersCompleted: container ${status.getContainerId}" +
+ s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
} else {
LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed")
}
@@ -178,22 +192,24 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
self ! ShutdownApplication
}
- private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo]): Receive = box {
+ private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo])
+ : Receive = box {
case GetActiveConfig(clientHost) =>
LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost)
- val filtered = ClusterConfig.filterOutDefaultConfig(config.withValue(Constants.GEARPUMP_HOSTNAME,
- ConfigValueFactory.fromAnyRef(clientHost)))
+ val filtered = ClusterConfig.filterOutDefaultConfig(
+ config.withValue(Constants.GEARPUMP_HOSTNAME,
+ ConfigValueFactory.fromAnyRef(clientHost)))
sender ! ActiveConfig(filtered)
case QueryVersion =>
LOG.info("QueryVersion")
sender ! Version(Util.version)
case QueryClusterInfo =>
LOG.info("QueryClusterInfo")
- val masterContainers = masters.map{master =>
+ val masterContainers = masters.map { master =>
master.id.toString + s"(${master.nodeId.toString})"
}
- val workerContainers = workers.map{worker=>
+ val workerContainers = workers.map { worker =>
worker.id.toString + s"(${worker.nodeId.toString})"
}
sender ! ClusterInfo(masterContainers, workerContainers)
@@ -242,7 +258,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
LOG.info(s"Launch Master on container " + container.getNodeHttpAddress)
val host = container.getNodeId.getHost
- val port = Util.findFreePort.get
+ val port = Util.findFreePort().get
LOG.info("=============PORT" + port)
val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port))
@@ -297,7 +313,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
nmClient, packagePath, confDir, UIService)))
val daemon = system.actorOf(Props(new Daemon(appMaster)))
- system.awaitTermination()
+ Await.result(system.whenTerminated, Duration.Inf)
LOG.info("YarnAppMaster is shutdown")
}
@@ -307,8 +323,9 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
override def receive: Actor.Receive = {
case Terminated(actor) =>
if (actor.compareTo(appMaster) == 0) {
- LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, shutting down current ActorSystem")
- context.system.shutdown()
+ LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " +
+ s"shutting down current ActorSystem")
+ context.system.terminate()
context.stop(self)
}
}
@@ -353,7 +370,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
var status = client.executeMethod(get)
if (status != 200) {
- // sleep a little bit, and try again
+ // Sleeps a little bit, and try again
Thread.sleep(3000)
status = client.executeMethod(get)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
index 9f30570..49ec3a0 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,18 @@
package io.gearpump.experiments.yarn.client
import java.io.IOException
+import scala.util.Try
import akka.actor.{ActorRef, ActorSystem}
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.experiments.yarn.glue.YarnClient
-import io.gearpump.util.{AkkaHelper, LogUtil}
import org.apache.commons.httpclient.HttpClient
import org.apache.commons.httpclient.methods.GetMethod
-import org.slf4j.Logger
-import scala.util.Try
+import io.gearpump.experiments.yarn.glue.Records.ApplicationId
+import io.gearpump.experiments.yarn.glue.YarnClient
+import io.gearpump.util.{AkkaHelper, LogUtil}
/**
- * Resolve AppMaster ActorRef
+ * Resolves AppMaster ActorRef
*/
class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
val LOG = LogUtil.getLogger(getClass)
@@ -67,7 +66,8 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
index += 1
val tryConnect = Try(fun)
if (tryConnect.isFailure) {
- Console.err.println(s"Failed to connect YarnAppMaster(tried $index)... " + tryConnect.failed.get.getMessage)
+ LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " +
+ tryConnect.failed.get.getMessage)
} else {
result = tryConnect.get
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
index 2c3f99e..7c3bc38 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,12 @@
* limitations under the License.
*/
package io.gearpump.experiments.yarn.client
-import io.gearpump.util.LogUtil
import org.slf4j.Logger
-object Client {
+import io.gearpump.util.LogUtil
+
+/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */
+object Client {
private val LOG: Logger = LogUtil.getLogger(getClass)
val LAUNCH = "launch"
@@ -27,14 +29,16 @@ object Client {
val commands = Map(LAUNCH -> LaunchCluster) ++
ManageCluster.commands.map(key => (key, ManageCluster)).toMap
- def usage: Unit = {
+ def usage(): Unit = {
val keys = commands.keys.toList.sorted
+ // scalastyle:off println
Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+ // scalastyle:on println
}
- def main(args: Array[String]) = {
+ def main(args: Array[String]): Unit = {
if (args.length == 0) {
- usage
+ usage()
} else {
val key = args(0)
val command = commands.get(key)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
index 0ff4f47..ea5e707 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,9 +20,13 @@ package io.gearpump.experiments.yarn.client
import java.io.{File, IOException, OutputStreamWriter}
import java.net.InetAddress
import java.util.zip.ZipInputStream
-import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigValueFactory}
+import org.slf4j.Logger
+
import io.gearpump.cluster.ClusterConfig
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.experiments.yarn
@@ -33,13 +37,9 @@ import io.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource}
import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
import io.gearpump.util.ActorUtil.askActor
import io.gearpump.util.{AkkaApp, LogUtil, Util}
-import org.slf4j.Logger
-
-import scala.concurrent.Await
/**
* Launch Gearpump on YARN
- *
*/
class LaunchCluster(
akka: Config,
@@ -50,7 +50,7 @@ class LaunchCluster(
appMasterResolver: AppMasterResolver,
version: String = Util.version) {
- import yarn.Constants._
+ import io.gearpump.experiments.yarn.Constants._
private implicit val dispatcher = actorSystem.dispatcher
private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -59,13 +59,13 @@ class LaunchCluster(
private val memory = akka.getString(APPMASTER_MEMORY).toInt
private val vcore = akka.getString(APPMASTER_VCORES).toInt
-
def submit(appName: String, packagePath: String): ApplicationId = {
LOG.info("Starting AM")
- // first step, check the version, to make sure local version matches remote version
+ // First step, check the version, to make sure local version matches remote version
if (!packagePath.endsWith(".zip")) {
- throw new IOException(s"YarnClient only support .zip distribution package, now it is ${packagePath}. Please download the zip " +
+ throw new IOException(s"YarnClient only support .zip distribution package," +
+ s" now it is ${packagePath}. Please download the zip " +
"package from website or use sbt assembly packArchiveZip to build one.")
}
@@ -76,24 +76,30 @@ class LaunchCluster(
val rootEntry = rootEntryPath(zip = packagePath)
if (!rootEntry.contains(version)) {
- throw new IOException(s"Check version failed! Local gearpump binary version $version doesn't match with remote path $packagePath")
+ throw new IOException(s"Check version failed! Local gearpump binary" +
+ s" version $version doesn't match with remote path $packagePath")
}
val resource = Resource.newInstance(memory, vcore)
val appId = yarnClient.createApplication
- // will upload the configs to HDFS home directory of current user.
+ // uploads the configs to HDFS home directory of current user.
val configPath = uploadConfigToHDFS(appId)
- val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath", s"-package $packagePath"))
+ val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath",
+ s"-package $packagePath"))
yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath)
LOG.info("Waiting application to finish...")
val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS)
- LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} at ${report.getFinishTime}, info: ${report.getDiagnostics}")
+ LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " +
+ s"at ${report.getFinishTime}, info: ${report.getDiagnostics}")
+
+ // scalastyle:off println
Console.println("================================================")
Console.println("==Application Id: " + appId)
+ // scalastyle:on println
appId
}
@@ -103,21 +109,20 @@ class LaunchCluster(
val appMaster = appMasterResolver.resolve(appId)
LOG.info(s"appMaster=${appMaster.path} host=$host")
val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config)
- import scala.concurrent.duration._
- future.map{ config =>
- val out = new File(output)
+ future.map { config =>
+ val out = new File(output)
ClusterConfig.saveConfig(config, out)
out
}
}
private def uploadConfigToHDFS(appId: ApplicationId): String = {
- // will use personal home directory so that it will not conflict with other users
+ // Uses personal home directory so that it will not conflict with other users
// conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf
val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/"
LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...")
- // copy config from local to remote.
+ // Copies config from local to remote.
val remoteConfFile = s"$confDir/gear.conf"
var out = fs.create(remoteConfFile)
var writer = new OutputStreamWriter(out)
@@ -127,14 +132,14 @@ class LaunchCluster(
writer.write(cleanedConfig.root().render())
writer.close()
- // save yarn-site.xml to remote
+ // Saves yarn-site.xml to remote
val yarn_site_xml = s"$confDir/yarn-site.xml"
out = fs.create(yarn_site_xml)
writer = new OutputStreamWriter(out)
yarnConf.writeXml(writer)
writer.close()
- // save log4j.properties to remote
+ // Saves log4j.properties to remote
val log4j_properties = s"$confDir/log4j.properties"
val log4j = LogUtil.loadConfiguration
out = fs.create(log4j_properties)
@@ -164,17 +169,21 @@ object LaunchCluster extends AkkaApp with ArgumentsParser {
}
override val options: Array[(String, CLIOption[Any])] = Array(
- PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
- NAME -> CLIOption[String]("<Application name showed in YARN>", required = false, defaultValue = Some("Gearpump")),
- VERBOSE -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
- OUTPUT -> CLIOption("<output path for configuration file>", required = false, defaultValue = None)
+ PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " +
+ "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
+ NAME -> CLIOption[String]("<Application name showed in YARN>", required = false,
+ defaultValue = Some("Gearpump")),
+ VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)),
+ OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+ defaultValue = None)
)
private val TIMEOUT_MILLISECONDS = 30 * 1000
override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
val parsed = parse(args)
if (parsed.getBoolean(VERBOSE)) {
- LogUtil.verboseLogToConsole
+ LogUtil.verboseLogToConsole()
}
val yarnConfig = new YarnConfig()
@@ -184,24 +193,27 @@ object LaunchCluster extends AkkaApp with ArgumentsParser {
val actorSystem = ActorSystem("launchCluster", akkaConf)
val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem)
- val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs, actorSystem, appMasterResolver)
+ val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs,
+ actorSystem, appMasterResolver)
val name = parsed.getString(NAME)
val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH))
if (parsed.exists(OUTPUT)) {
import scala.concurrent.duration._
- Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)), TIMEOUT_MILLISECONDS milliseconds)
+ Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)),
+ TIMEOUT_MILLISECONDS.milliseconds)
}
- yarnClient.stop
- actorSystem.shutdown()
- actorSystem.awaitTermination()
+ yarnClient.stop()
+ actorSystem.terminate()
+ Await.result(actorSystem.whenTerminated, Duration.Inf)
}
private def updateConf(akka: Config, parsed: ParseResult): Config = {
if (parsed.exists(PACKAGE)) {
- akka.withValue(Constants.PACKAGE_PATH, ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
+ akka.withValue(Constants.PACKAGE_PATH,
+ ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
} else {
akka
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
index 20fd13e..3e50abe 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,8 +20,10 @@ package io.gearpump.experiments.yarn.client
import java.io.{File, IOException}
import java.net.InetAddress
+import scala.concurrent.{Await, Future}
import akka.actor.{ActorRef, ActorSystem}
+
import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
import io.gearpump.cluster.ClusterConfig
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
@@ -31,18 +33,23 @@ import io.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig}
import io.gearpump.util.ActorUtil.askActor
import io.gearpump.util.{AkkaApp, LogUtil}
-import scala.concurrent.{Await, Future}
-
+/** Manage current Gearpump cluster on YARN */
class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) {
- import ManageCluster._
+ import io.gearpump.experiments.yarn.client.ManageCluster._
private val host = InetAddress.getLocalHost.getHostName
implicit val dispatcher = system.dispatcher
def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host))
def version: Future[Version] = askActor[Version](appMaster, QueryVersion)
- def addWorker(count: Int): Future[CommandResult] = askActor[CommandResult](appMaster, AddWorker(count))
- def removeWorker(worker: String): Future[CommandResult] = askActor[CommandResult](appMaster, RemoveWorker(worker))
+ def addWorker(count: Int): Future[CommandResult] = {
+ askActor[CommandResult](appMaster, AddWorker(count))
+ }
+
+ def removeWorker(worker: String): Future[CommandResult] = {
+ askActor[CommandResult](appMaster, RemoveWorker(worker))
+ }
+
def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill)
def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo)
@@ -91,18 +98,23 @@ object ManageCluster extends AkkaApp with ArgumentsParser {
val APPID = "appid"
val VERBOSE = "verbose"
- val commands = List(GET_CONFIG,ADD_WORKER, REMOVE_WORKER, KILL,VERSION, QUERY)
+ val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY)
import scala.concurrent.duration._
- val TIME_OUT_SECONDS = 30 seconds
+ val TIME_OUT_SECONDS = 30.seconds
override val options: Array[(String, CLIOption[Any])] = Array(
COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true),
- APPID ->CLIOption[String]("<Application id, format: application_timestamp_id>", required = true),
- COUNT -> CLIOption("<how many instance to add or remove>", required = false, defaultValue = Some(1)),
- VERBOSE -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
- OUTPUT -> CLIOption("<output path for configuration file>", required = false, defaultValue = Some("")),
- CONTAINER -> CLIOption("<container id for master or worker>", required = false, defaultValue = Some(""))
+ APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>",
+ required = true),
+ COUNT -> CLIOption("<how many instance to add or remove>", required = false,
+ defaultValue = Some(1)),
+ VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)),
+ OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+ defaultValue = Some("")),
+ CONTAINER -> CLIOption("<container id for master or worker>", required = false,
+ defaultValue = Some(""))
)
override def main(akkaConf: Config, args: Array[String]): Unit = {
@@ -113,7 +125,7 @@ object ManageCluster extends AkkaApp with ArgumentsParser {
val parsed = parse(args)
if (parsed.getBoolean(VERBOSE)) {
- LogUtil.verboseLogToConsole
+ LogUtil.verboseLogToConsole()
}
val appId = parseAppId(parsed.getString(APPID))
@@ -128,9 +140,11 @@ object ManageCluster extends AkkaApp with ArgumentsParser {
val command = parsed.getString(COMMAND)
val result = manager.command(command, parsed)
+ // scalastyle:off println
Console.println(Await.result(result, TIME_OUT_SECONDS))
- system.shutdown()
- system.awaitTermination()
+ // scalastyle:on println
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
def parseAppId(str: String): ApplicationId = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
index 93cee28..6b3385f 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
@@ -1,4 +1,4 @@
-/*-
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,8 +20,8 @@ package io.gearpump.experiments.yarn.glue
import java.io.File
import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
-import io.gearpump.util.LogUtil
import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path}
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.mapreduce.security.TokenCache
@@ -32,18 +32,19 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.slf4j.Logger
-import scala.collection.JavaConversions._
+import io.gearpump.util.LogUtil
-private [glue]
+private[glue]
object ContainerLaunchContext {
private val LOG: Logger = LogUtil.getLogger(getClass)
- def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String): ContainerLaunchContext = {
+ def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String)
+ : ContainerLaunchContext = {
val context = Records.newRecord(classOf[ContainerLaunchContext])
- context.setCommands(Seq(command))
- context.setEnvironment(getAppEnv(yarnConf))
+ context.setCommands(Seq(command).asJava)
+ context.setEnvironment(getAppEnv(yarnConf).asJava)
context.setTokens(getToken(yarnConf, packagePath, configPath))
- context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath))
+ context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava)
context
}
@@ -60,7 +61,9 @@ object ContainerLaunchContext {
Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator))
}
- private def getAMLocalResourcesMap(yarnConf: YarnConfiguration, packagePath: String, configPath: String): Map[String, LocalResource] = {
+ private def getAMLocalResourcesMap(
+ yarnConf: YarnConfiguration, packagePath: String, configPath: String)
+ : Map[String, LocalResource] = {
val fs = getFs(yarnConf)
Map(
@@ -70,8 +73,9 @@ object ContainerLaunchContext {
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION))
}
- private def newYarnAppResource(fs: YarnFileSystem, path: Path,
- resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
+ private def newYarnAppResource(
+ fs: YarnFileSystem, path: Path,
+ resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
val qualified = fs.makeQualified(path)
val status = fs.getFileStatus(qualified)
val resource = Records.newRecord(classOf[LocalResource])
@@ -83,7 +87,8 @@ object ContainerLaunchContext {
resource
}
- private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String): ByteBuffer = {
+ private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String)
+ : ByteBuffer = {
val credentials = UserGroupInformation.getCurrentUser.getCredentials
val dob = new DataOutputBuffer
val dirs = Array(new Path(packagePath), new Path(configPath))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
index e3dfb7f..acf09ac 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,11 +20,11 @@ package io.gearpump.experiments.yarn.glue
import java.io.{InputStream, OutputStream}
import java.net.ConnectException
+import scala.util.{Failure, Success, Try}
-import io.gearpump.util.LogUtil
import org.apache.hadoop.fs.Path
-import scala.util.{Success, Failure, Try}
+import io.gearpump.util.LogUtil
class FileSystem(yarnConfig: YarnConfig) {
@@ -33,26 +33,26 @@ class FileSystem(yarnConfig: YarnConfig) {
private def LOG = LogUtil.getLogger(getClass)
- def open(file: String): InputStream = exceptionHandler{
+ def open(file: String): InputStream = exceptionHandler {
val path = new Path(file)
fs.open(path)
}
- def create(file: String): OutputStream = exceptionHandler{
+ def create(file: String): OutputStream = exceptionHandler {
val path = new Path(file)
fs.create(path)
}
- def exists(file: String): Boolean = exceptionHandler{
+ def exists(file: String): Boolean = exceptionHandler {
val path = new Path(file)
fs.exists(path)
}
def name: String = {
- fs.getName
+ fs.getUri.toString
}
- def getHomeDirectory: String = {
+ def getHomeDirectory: String = {
fs.getHomeDirectory.toString
}
@@ -62,8 +62,10 @@ class FileSystem(yarnConfig: YarnConfig) {
case Success(v) => v
case Failure(ex) =>
if (ex.isInstanceOf[ConnectException]) {
- LOG.error("Please check whether we connect to the right HDFS file system, current file system is $name." +
- "\n. Please copy all configs under $HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, so that we can use the right File system.", ex)
+ LOG.error("Please check whether we connect to the right HDFS file system, " +
+ "current file system is $name." + "\n. Please copy all configs under " +
+ "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " +
+ "so that we can use the right File system.", ex)
}
throw ex
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
index f0a5fc7..3e7e668 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,12 +22,13 @@ import java.nio.ByteBuffer
import akka.actor.ActorRef
import com.typesafe.config.Config
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource}
import org.apache.hadoop.yarn.client.api.async.NMClientAsync
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+
+import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
+import io.gearpump.experiments.yarn.glue.Records._
+import io.gearpump.util.LogUtil
/**
* Adapter for node manager client
*/
@@ -46,45 +47,49 @@ class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.Callb
client.start()
}
- private [glue]
- override def onContainerStarted(containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
+ private[glue]
+ override def onContainerStarted(
+ containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
LOG.info(s"Container started : $containerId, " + allServiceResponse)
reportTo ! ContainerStarted(containerId)
}
- private [glue]
- override def onContainerStatusReceived(containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
+ private[glue]
+ override def onContainerStatusReceived(
+ containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
LOG.info(s"Container status received : $containerId, status $containerStatus")
}
- private [glue]
+ private[glue]
override def onContainerStopped(containerId: YarnContainerId) {
LOG.error(s"Container stopped : $containerId")
}
- private [glue]
+ private[glue]
override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) {
LOG.error(s"Container exception : $containerId", throwable)
}
- private [glue]
+ private[glue]
override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) {
LOG.error(s"Container exception : $containerId", throwable)
}
- private [glue]
+ private[glue]
override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) {
LOG.error(s"Container exception : $containerId", throwable)
}
- def launchCommand(container: Container, command: String, packagePath: String, configPath: String): Unit = {
- LOG.info(s"Launching command : $command on container: ${container.getId}, host ip : ${container.getNodeId.getHost}")
+ def launchCommand(
+ container: Container, command: String, packagePath: String, configPath: String): Unit = {
+ LOG.info(s"Launching command : $command on container" +
+ s": ${container.getId}, host ip : ${container.getNodeId.getHost}")
val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath)
client.startContainerAsync(container, context)
}
def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = {
- LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} " )
+ LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ")
client.stopContainerAsync(containerId, nodeId)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
index c1d26f6..7b9d83c 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,17 @@
package io.gearpump.experiments.yarn.glue
+import scala.collection.JavaConverters._
+
import akka.actor.ActorRef
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource}
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
import org.slf4j.Logger
-import scala.collection.JavaConverters._
+import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
+import io.gearpump.experiments.yarn.glue.Records._
+import io.gearpump.util.LogUtil
/**
* Adapter for resource manager client
@@ -46,7 +47,7 @@ class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
}
private def startAMRMClient: AMRMClientAsync[ContainerRequest] = {
- val timeIntervalMs = 1000 //ms
+ val timeIntervalMs = 1000 // ms
val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this)
amrmClient.init(yarnConf.conf)
amrmClient.start()
@@ -59,16 +60,19 @@ class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
private[glue]
override def onContainersAllocated(containers: java.util.List[YarnContainer]) {
- val newContainers = containers.asScala.toList.filterNot(container => allocatedContainers.contains(container.getId))
+ val newContainers = containers.asScala.toList.filterNot(container =>
+ allocatedContainers.contains(container.getId))
allocatedContainers ++= newContainers.map(_.getId)
LOG.info(s"New allocated ${newContainers.size} containers")
reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_)))
}
private[glue]
- override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus]): Unit = {
+ override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus])
+ : Unit = {
LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}")
- reportTo ! ContainersCompleted(completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
+ reportTo ! ContainersCompleted(
+ completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
}
private[glue]