You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ap...@apache.org on 2016/04/11 20:26:38 UTC

[36/50] incubator-gearpump git commit: fix #1966 make Partitioner API Java compatible

fix #1966 make Partitioner API Java compatible


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/25aacb29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/25aacb29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/25aacb29

Branch: refs/heads/master
Commit: 25aacb29c73e0fccca2cfbdc4b13aae9204b2aa4
Parents: 9feb159
Author: huafengw <fv...@gmail.com>
Authored: Tue Feb 16 15:42:25 2016 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Wed Feb 17 15:25:53 2016 +0800

----------------------------------------------------------------------
 .../partitioner/BroadcastPartitioner.scala      | 11 +++++++--
 .../io/gearpump/partitioner/Partitioner.scala   |  4 +--
 .../streaming/examples/stock/QueryServer.scala  | 26 ++++++++++----------
 .../examples/transport/QueryServer.scala        | 18 +++++++-------
 .../storm/partitioner/StormPartitioner.scala    |  6 ++---
 .../storm/topology/GearpumpTuple.scala          |  2 +-
 .../experiments/storm/util/Grouper.scala        | 25 ++++++++++---------
 .../storm/util/StormOutputCollector.scala       | 10 ++++----
 .../partitioner/StormPartitionerSpec.scala      |  6 ++---
 .../storm/util/StormOutputCollectorSpec.scala   | 10 ++++----
 .../io/gearpump/services/MasterService.scala    | 22 +++++++----------
 .../streaming/appmaster/TaskManagerSpec.scala   | 18 ++++----------
 12 files changed, 77 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
index 07ce399..dba02ee 100644
--- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
@@ -21,7 +21,14 @@ package io.gearpump.partitioner
 import io.gearpump.Message
 
 class BroadcastPartitioner extends MulticastPartitioner {
-  override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): List[Int] = {
-    (0 until partitionNum).toList
+  private var lastPartitionNum = -1
+  private var partitions = Array.empty[Int]
+
+  override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+    if (partitionNum != lastPartitionNum) {
+      partitions = (0 until partitionNum).toArray
+      lastPartitionNum = partitionNum
+    }
+    partitions
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
index 2478d3a..6285bb7 100644
--- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
@@ -41,9 +41,9 @@ trait UnicastPartitioner extends Partitioner {
 }
 
 trait MulticastPartitioner extends Partitioner {
-  def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): List[Int]
+  def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int]
 
-  def getPartitions(msg: Message, partitionNum: Int): List[Int] = {
+  def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
     getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
index a705000..aa8541a 100644
--- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
+++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala
@@ -21,29 +21,29 @@ package io.gearpump.streaming.examples.stock
 
 import java.util.concurrent.TimeUnit
 
-import akka.actor.{Props, Actor}
-import akka.io.IO
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary, AppMaster}
-import AppMaster.{TaskActorRef, LookupTaskActorRef}
 import akka.actor.Actor._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import akka.actor.{Actor, Props}
+import akka.io.IO
+import akka.pattern.ask
 import io.gearpump.Message
 import io.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest
 import io.gearpump.cluster.UserConfig
-import QueryServer.WebServer
-import akka.pattern.ask
+import io.gearpump.streaming.ProcessorId
+import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import io.gearpump.streaming.appmaster.{AppMaster, ProcessorSummary, StreamAppMasterSummary}
+import io.gearpump.streaming.examples.stock.QueryServer.WebServer
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
 import spray.can.Http
-import spray.http.{StatusCodes}
-import spray.routing.HttpService
-import upickle.default.{read, write}
+import spray.http.StatusCodes
 import spray.json._
+import spray.routing.HttpService
+import upickle.default.write
 
-import scala.concurrent.{Future, ExecutionContext}
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
 class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){
-  import taskContext.{appMaster, appId}
+  import taskContext.{appId, appMaster}
 
   import ExecutionContext.Implicits.global
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
index ce22648..5f91883 100644
--- a/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
+++ b/examples/streaming/transport/src/main/scala/io/gearpump/streaming/examples/transport/QueryServer.scala
@@ -19,31 +19,31 @@ package io.gearpump.streaming.examples.transport
 
 import java.util.concurrent.TimeUnit
 
-import akka.actor.{Props, Actor}
 import akka.actor.Actor._
+import akka.actor.{Actor, Props}
 import akka.io.IO
 import akka.pattern.ask
-import io.gearpump.streaming.{ProcessorId, StreamApplication, ProcessorDescription, DAG}
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
-import QueryServer.{GetAllRecords, WebServer}
 import io.gearpump.partitioner.PartitionerDescription
-import AppMaster.{TaskActorRef, LookupTaskActorRef}
+import io.gearpump.streaming.appmaster.AppMaster
+import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
+import io.gearpump.streaming.examples.transport.QueryServer.{GetAllRecords, WebServer}
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import io.gearpump.streaming.{DAG, ProcessorDescription, ProcessorId, StreamApplication}
 import io.gearpump.util.Graph
 import spray.can.Http
 import spray.http.StatusCodes
-import spray.routing.HttpService
-import upickle.default.{read, write}
 import spray.json._
+import spray.routing.HttpService
+import upickle.default.write
 
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
 
 class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){
-  import taskContext.{appMaster, appId}
   import system.dispatcher
+  import taskContext.appMaster
 
   var inspector: (ProcessorId, ProcessorDescription) = null
   implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
index d2071ae..04ccb91 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -21,7 +21,7 @@ package io.gearpump.experiments.storm.partitioner
 import io.gearpump.Message
 import io.gearpump.experiments.storm.topology.GearpumpTuple
 import io.gearpump.experiments.storm.util.StormOutputCollector
-import io.gearpump.partitioner.{MulticastPartitioner, Partitioner}
+import io.gearpump.partitioner.{Partitioner, MulticastPartitioner}
 
 /**
  * this is a partitioner bound to a target Storm component
@@ -38,9 +38,9 @@ import io.gearpump.partitioner.{MulticastPartitioner, Partitioner}
  */
 private[storm] class StormPartitioner(target: String) extends MulticastPartitioner {
 
-  override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): List[Int] = {
+  override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
     val stormTuple = msg.msg.asInstanceOf[GearpumpTuple]
-    stormTuple.targetPartitions.getOrElse(target, List(Partitioner.UNKNOWN_PARTITION_ID))
+    stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
index a52c9c7..7662f36 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
@@ -34,7 +34,7 @@ private[storm] class GearpumpTuple(
     val values: JList[AnyRef],
     val sourceTaskId: Integer,
     val sourceStreamId: String,
-    @transient val targetPartitions: Map[String, List[Int]]) extends Serializable {
+    @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable {
   /**
    * creates a Storm [[Tuple]] to be passed to a Storm component
    * this is needed for each incoming message

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
index 1ac5f71..d727b7a 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
@@ -38,14 +38,14 @@ sealed trait Grouper {
    * @param values storm tuple values
    * @return a list of gearpump partitions
    */
-  def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int]
+  def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int]
 }
 
 /**
  * GlobalGrouper always returns partition 0
  */
 class GlobalGrouper extends Grouper {
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = List(0)
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = Array(0)
 }
 
 /**
@@ -55,9 +55,9 @@ class GlobalGrouper extends Grouper {
 class NoneGrouper(numTasks: Int) extends Grouper {
   private val random = new Random
 
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = {
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
     val partition = StormUtil.mod(random.nextInt, numTasks)
-    List(partition)
+    Array(partition)
   }
 }
 
@@ -71,7 +71,7 @@ class ShuffleGrouper(numTasks: Int) extends Grouper {
   private var index = -1
   private var partitions = List.empty[Int]
 
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = {
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
     index += 1
     if (partitions.isEmpty) {
       partitions = 0.until(numTasks).toList
@@ -80,7 +80,7 @@ class ShuffleGrouper(numTasks: Int) extends Grouper {
       index = 0
       partitions = random.shuffle(partitions)
     }
-    List(partitions(index))
+    Array(partitions(index))
   }
 }
 
@@ -92,10 +92,10 @@ class ShuffleGrouper(numTasks: Int) extends Grouper {
  */
 class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) extends Grouper {
 
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = {
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
     val hash = outFields.select(groupFields, values).hashCode()
     val partition = StormUtil.mod(hash, numTasks)
-    List(partition)
+    Array(partition)
   }
 }
 
@@ -104,9 +104,10 @@ class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) exten
  * @param numTasks number of target tasks
  */
 class AllGrouper(numTasks: Int) extends Grouper {
+  val partitions = (0 until numTasks).toArray
 
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = {
-    (0 until numTasks).toList
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+    partitions
   }
 }
 
@@ -120,8 +121,8 @@ class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper {
     grouping.prepare(topologyContext, globalStreamId, targetTasks)
   }
 
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): List[Int] = {
-    grouping.chooseTasks(taskId, values).map(StormUtil.stormTaskIdToGearpump(_).index).toList
+  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = {
+    grouping.chooseTasks(taskId, values).map(StormUtil.stormTaskIdToGearpump(_).index).toArray
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
index 1abc246..c1df63b 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -66,12 +66,12 @@ object StormOutputCollector {
       targets: JMap[String, JMap[String, Grouping]],
       streamGroupers: Map[String, Grouper],
       componentToProcessorId: Map[String, ProcessorId],
-      values: JList[AnyRef]): (Map[String, List[Int]], JList[Integer]) ={
+      values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) ={
     val ret: JList[Integer] = new JArrayList[Integer](targets.size)
 
     @annotation.tailrec
     def getRecur(iter: JIterator[String],
-        accum: Map[String, List[Int]]): Map[String, List[Int]] = {
+        accum: Map[String, Array[Int]]): Map[String, Array[Int]] = {
       if (iter.hasNext) {
         val target = iter.next
         val grouper = streamGroupers(streamId)
@@ -85,7 +85,7 @@ object StormOutputCollector {
         accum
       }
     }
-    val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, Map.empty[String, List[Int]])
+    val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, Map.empty[String, Array[Int]])
     (targetPartitions, ret)
   }
 
@@ -156,7 +156,7 @@ class StormOutputCollector(
     stormTaskId: Int,
     taskToComponent: JMap[Integer, String],
     targets: JMap[String, JMap[String, Grouping]],
-    getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, List[Int]], JList[Integer]),
+    getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]),
     val taskContext: TaskContext,
     private var timestamp: TimeStamp) {
   import io.gearpump.experiments.storm.util.StormOutputCollector._
@@ -196,7 +196,7 @@ class StormOutputCollector(
     if (targets.containsKey(streamId)) {
       val target = taskToComponent.get(id)
       val partition = stormTaskIdToGearpump(id).index
-      val targetPartitions = Map(target -> List(partition))
+      val targetPartitions = Map(target -> Array(partition))
       val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
       taskContext.output(Message(tuple, timestamp))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
index ef66937..200807c 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
@@ -32,17 +32,17 @@ class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
   property("StormPartitioner should get partitions directed by message and target") {
     val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
     val componentsGen = Gen.listOf[String](Gen.alphaStr).map(_.distinct).suchThat(_.size > 1)
-    val partitionsGen = Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted)
+    val partitionsGen = Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted.toArray)
     val tupleFactoryGen = for {
       values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
       sourceTaskId <- idGen
       sourceStreamId <- Gen.alphaStr
-    } yield (targetPartitions: Map[String, List[Int]]) => {
+    } yield (targetPartitions: Map[String, Array[Int]]) => {
         new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, targetPartitions)
       }
 
     forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) {
-      (tupleFactory: Map[String, List[Int]] => GearpumpTuple, id: Int, components: List[String], partitions: List[Int]) =>
+      (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int, components: List[String], partitions: Array[Int]) =>
         val currentPartitionId = id
         val targetPartitions = components.init.map(c => (c, partitions)).toMap
         val tuple = tupleFactory(targetPartitions)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index 6f16eb1..c75e92c 100644
--- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -43,8 +43,8 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
       (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
         val targets = mock[JMap[String, JMap[String, Grouping]]]
         val taskToComponent = mock[JMap[Integer, String]]
-        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, List[Int]], JList[Integer])]
-        val targetPartitions = mock[Map[String, List[Int]]]
+        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])]
+        val targetPartitions = mock[Map[String, Array[Int]]]
         val targetStormTaskIds = mock[JList[Integer]]
         when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
@@ -75,8 +75,8 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
         val targets = mock[JMap[String, JMap[String, Grouping]]]
         val taskToComponent = mock[JMap[Integer, String]]
         when(taskToComponent.get(id)).thenReturn(target)
-        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, List[Int]], JList[Integer])]
-        val targetPartitions = mock[Map[String, List[Int]]]
+        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])]
+        val targetPartitions = mock[Map[String, Array[Int]]]
         val targetStormTaskIds = mock[JList[Integer]]
         when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
@@ -89,7 +89,7 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher
         when(targets.containsKey(streamId)).thenReturn(true)
         stormOutputCollector.setTimestamp(timestamp)
         stormOutputCollector.emitDirect(id, streamId, values)
-        val partitions = List(StormUtil.stormTaskIdToGearpump(id).index)
+        val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
         verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
           case Message(tuple: GearpumpTuple, t) =>
             val expected = new GearpumpTuple(values, stormTaskId, streamId, Map(target -> partitions))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index aa90726..839c0ae 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -23,32 +23,28 @@ import java.io.{File, IOException}
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
 import akka.http.scaladsl.unmarshalling.Unmarshaller._
-import akka.stream.{Materializer, ActorMaterializer}
-import com.typesafe.config.{ConfigRenderOptions, Config}
+import akka.stream.Materializer
+import com.typesafe.config.Config
 import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics, QueryMasterConfig}
+import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption}
 import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList}
 import io.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, SubmitApplicationResultValue}
-import io.gearpump.cluster.{ClusterConfig, UserConfig}
 import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.AppSubmitter
 import io.gearpump.cluster.worker.WorkerSummary
+import io.gearpump.cluster.{ClusterConfig, UserConfig}
 import io.gearpump.jarstore.JarStoreService
+import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
 import io.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription}
 import io.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication}
-import io.gearpump.util.ActorUtil.{askActor, _}
-import io.gearpump.util.Constants._
-import io.gearpump.util.FileDirective._
-import io.gearpump.util.{Graph, Constants, Util, FileUtils}
 import io.gearpump.util.ActorUtil._
-import io.gearpump.services.MasterService.{SubmitApplicationRequest, BuiltinPartitioners}
+import io.gearpump.util.FileDirective._
+import io.gearpump.util.{Constants, Graph, Util}
 
 import scala.collection.JavaConversions._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
 
 class MasterService(val master: ActorRef,
     val jarStore: JarStoreService, override val system: ActorSystem)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/25aacb29/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 4e4e96b..621455d 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -20,34 +20,26 @@ package io.gearpump.streaming.appmaster
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.testkit.TestProbe
-import io.gearpump.streaming.AppMasterToExecutor.{StartAllTasks, TaskLocationsReceived, StartDynamicDag, TaskLocationsReady, TaskRegistered, LaunchTasks}
-import io.gearpump.streaming.{ProcessorId, DAG, LifeTime, ProcessorDescription}
-import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask
-import io.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut
-import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAGSuccess, ChangeToNewDAG}
-import io.gearpump.streaming.appmaster.DagManager.{TaskLaunchData, GetLatestDAG, NewDAGDeployed, WatchChange}
-import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutorsTimeOut, StartExecutors}
-import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
-import io.gearpump.streaming.task.{StartTime, TaskContext, GetStartClock, Subscriber}
-import io.gearpump.{TimeStamp, Message}
 import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
 import io.gearpump.cluster.{AppJar, TestUtil, UserConfig}
 import io.gearpump.jarstore.FilePath
 import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
+import io.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered}
 import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask
 import io.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut
 import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess}
 import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
-import io.gearpump.streaming.appmaster.ExecutorManager._
+import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutors, _}
 import io.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail
 import io.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2}
-import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
 import io.gearpump.streaming.executor.Executor.RestartTasks
-import io.gearpump.streaming.task._
+import io.gearpump.streaming.task.{StartTime, TaskContext, _}
+import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
+import io.gearpump.{Message, TimeStamp}
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}