You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2017/01/20 08:47:57 UTC

[17/19] incubator-gearpump git commit: merge master into akka-streams branch

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
index d4b7871..55aa73f 100644
--- a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.metrics
 
-import org.apache.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
+import com.codahale.metrics.{Meter => CodaHaleMeter}
 
 /** See org.apache.gearpump.codahale.metrics.Meter */
 class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
index 1ee3798..3737361 100644
--- a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 import akka.actor._
 import org.slf4j.Logger
 
-import org.apache.gearpump.codahale.metrics._
+import com.codahale.metrics._
 import org.apache.gearpump.metrics
 import org.apache.gearpump.util.LogUtil
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
index 94aa114..620dc61 100644
--- a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
+++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
@@ -20,12 +20,11 @@ package org.apache.gearpump.metrics
 
 import java.net.InetSocketAddress
 import java.util.concurrent.TimeUnit
-import scala.concurrent.duration._
 
+import scala.concurrent.duration._
 import akka.actor.{Actor, ActorRef}
-
-import org.apache.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
-import org.apache.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
+import com.codahale.metrics.{MetricFilter, Slf4jReporter}
+import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
 import org.apache.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
 import org.apache.gearpump.metrics.MetricsReporterService.ReportTo
 import org.apache.gearpump.util.Constants._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
deleted file mode 100644
index 99cbcb6..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/** Used by storm module to broadcast message to all downstream tasks  */
-class BroadcastPartitioner extends MulticastPartitioner {
-  private var lastPartitionNum = -1
-  private var partitions = Array.empty[Int]
-
-  override def getPartitions(
-      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
-    if (partitionNum != lastPartitionNum) {
-      partitions = (0 until partitionNum).toArray
-      lastPartitionNum = partitionNum
-    }
-    partitions
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
deleted file mode 100644
index 5a3eec4..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/**
- * Will have the same parallelism with last processor
- * And each task in current processor will co-locate with task of last processor
- */
-class CoLocationPartitioner extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    currentPartitionId
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
deleted file mode 100644
index ee684a9..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/**
- * Only make sense when the message has implemented the hashCode()
- * Otherwise, it will use Object.hashCode(), which will not return
- * same hash code after serialization and deserialization.
- */
-class HashPartitioner extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
deleted file mode 100644
index d68fa65..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import scala.reflect.ClassTag
-
-import org.apache.commons.lang.SerializationUtils
-
-import org.apache.gearpump.Message
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
- * of upstream processor A send to several tasks of downstream processor B.
- */
-sealed trait Partitioner extends Serializable
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
- * ONE-task {@literal ->} ONE-task mapping.
- */
-trait UnicastPartitioner extends Partitioner {
-
-  /**
-   * Gets the SINGLE downstream processor task index to send message to.
-   *
-   * @param msg Message you want to send
-   * @param partitionNum How many tasks does the downstream processor have.
-   * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
-   *
-   * @return ONE task index of downstream processor.
-   */
-  def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
-
-  def getPartition(msg: Message, partitionNum: Int): Int = {
-    getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
-  }
-}
-
-trait MulticastPartitioner extends Partitioner {
-
-  /**
-   * Gets a list of downstream processor task indexes to send message to.
-   *
-   * @param upstreamTaskIndex Current sender task's task index.
-   *
-   */
-  def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
-
-  def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
-    getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
-  }
-}
-
-sealed trait PartitionerFactory {
-
-  def name: String
-
-  def partitioner: Partitioner
-}
-
-/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
-class PartitionerObject(private[this] val _partitioner: Partitioner)
-  extends PartitionerFactory with Serializable {
-
-  override def name: String = partitioner.getClass.getName
-
-  override def partitioner: Partitioner = {
-    SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
-  }
-}
-
-/** Store the partitioner in class Name, the user need to instantiate a new class */
-class PartitionerByClassName(partitionerClass: String)
-  extends PartitionerFactory with Serializable {
-
-  override def name: String = partitionerClass
-  override def partitioner: Partitioner = {
-    Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
-  }
-}
-
-/**
- * @param partitionerFactory How we construct a Partitioner.
- */
-case class PartitionerDescription(partitionerFactory: PartitionerFactory)
-
-object Partitioner {
-  val UNKNOWN_PARTITION_ID = -1
-
-  def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
-    PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
deleted file mode 100644
index 55ef614..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import scala.util.Random
-
-import org.apache.gearpump.Message
-
-/**
- * The idea of ShuffleGroupingPartitioner is derived from Storm.
- * Messages are randomly distributed across the downstream's tasks in a way such that
- * each task is guaranteed to get an equal number of messages.
- */
-class ShuffleGroupingPartitioner extends UnicastPartitioner {
-  private val random = new Random
-  private var index = -1
-  private var partitions = List.empty[Int]
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    index += 1
-    if (partitions.isEmpty) {
-      partitions = 0.until(partitionNum).toList
-      partitions = random.shuffle(partitions)
-    } else if (index >= partitionNum) {
-      index = 0
-      partitions = random.shuffle(partitions)
-    }
-    partitions(index)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
deleted file mode 100644
index 5c66d66..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import java.util.Random
-
-import org.apache.gearpump.Message
-
-/**
- * Round Robin partition the data to downstream processor tasks.
- */
-class ShufflePartitioner extends UnicastPartitioner {
-  private var seed = 0
-  private var count = 0
-
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-
-    if (seed == 0) {
-      seed = newSeed()
-    }
-
-    val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
-    count = count + 1
-    result
-  }
-
-  private def newSeed(): Int = new Random().nextInt()
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
index f258c0f..cb3d2fd 100644
--- a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
+++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
@@ -19,19 +19,18 @@
 package org.apache.gearpump.serializer
 
 import akka.actor.ExtendedActorSystem
-
-import org.apache.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
-import org.apache.gearpump.objenesis.strategy.StdInstantiatorStrategy
-import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper
+import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
+import com.romix.akka.serialization.kryo.{KryoBasedSerializer, KryoSerializer}
 import org.apache.gearpump.serializer.FastKryoSerializer.KryoSerializationException
 import org.apache.gearpump.util.LogUtil
+import org.objenesis.strategy.StdInstantiatorStrategy
 
 class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
 
   private val LOG = LogUtil.getLogger(getClass)
   private val config = system.settings.config
 
-  private val kryoSerializer = new KryoSerializerWrapper(system)
+  private val kryoSerializer: KryoBasedSerializer = new KryoSerializer(system).serializer
   private val kryo = kryoSerializer.kryo
   val strategy = new DefaultInstantiatorStrategy
   strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy)
@@ -40,7 +39,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
 
   override def serialize(message: Any): Array[Byte] = {
     try {
-      kryoSerializer.toBinary(message)
+      kryoSerializer.toBinary(message.asInstanceOf[AnyRef])
     } catch {
       case ex: java.lang.IllegalArgumentException =>
         val clazz = message.getClass

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
index 524089d..45a5481 100644
--- a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
+++ b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
@@ -18,10 +18,9 @@
 
 package org.apache.gearpump.serializer
 
+import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
 import com.typesafe.config.Config
 import org.slf4j.Logger
-
-import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
 import org.apache.gearpump.util.{Constants, LogUtil}
 
 class GearpumpSerialization(config: Config) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
index 09f2969..82c7fe2 100644
--- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -21,13 +21,12 @@ package org.apache.gearpump.util
 import org.apache.gearpump.cluster.AppMasterContext
 import org.apache.gearpump.cluster.worker.WorkerId
 
-import scala.concurrent.{ExecutionContext, Future}
-
+import scala.concurrent.{Await, ExecutionContext, Future}
 import akka.actor.Actor.Receive
 import akka.actor._
 import akka.pattern.ask
 import org.slf4j.Logger
-
+import akka.util.Timeout
 import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers}
 import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
 import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
@@ -36,6 +35,8 @@ import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSy
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
 import org.apache.gearpump.transport.HostPort
 
+import scala.concurrent.duration.Duration
+
 object ActorUtil {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
@@ -136,4 +137,13 @@ object ActorUtil {
     implicit val timeout = Constants.FUTURE_TIMEOUT
     (actor ? msg).asInstanceOf[Future[T]]
   }
+
+  def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout)(implicit ex: ExecutionContext): T = {
+    askActor(actor, msg, timeout, ActorRef.noSender)
+  }
+
+  def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout, sender: ActorRef)
+    (implicit ex: ExecutionContext): T = {
+    Await.result(actor.ask(msg)(timeout, sender).asInstanceOf[Future[T]], Duration.Inf)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
index dba5a1f..c98726e 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Constants.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
@@ -20,8 +20,6 @@ package org.apache.gearpump.util
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.gearpump.partitioner._
-
 object Constants {
   val MASTER_WATCHER = "masterwatcher"
   val SINGLETON_MANAGER = "singleton"
@@ -140,14 +138,6 @@ object Constants {
   val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path"
   val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise"
 
-  // The partitioners provided by Gearpump
-  val BUILTIN_PARTITIONERS = Array(
-    classOf[BroadcastPartitioner],
-    classOf[CoLocationPartitioner],
-    classOf[HashPartitioner],
-    classOf[ShuffleGroupingPartitioner],
-    classOf[ShufflePartitioner])
-
   // Security related
   val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file"
   val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
index e3df37b..283a64a 100644
--- a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.util
 import java.io.{File, IOException}
 import java.nio.charset.Charset
 
-import org.apache.gearpump.google.common.io.Files
+import com.google.common.io.Files
 
 object FileUtils {
   private val UTF8 = Charset.forName("UTF-8")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala
index 0faa46a..8ee0e26 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Util.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala
@@ -66,8 +66,14 @@ object Util {
   def startProcess(options: Array[String], classPath: Array[String], mainClass: String,
       arguments: Array[String]): RichProcess = {
     val java = System.getProperty("java.home") + "/bin/java"
-    val command = List(java) ++ options ++
-      List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments
+
+    val command = List(java) ++
+      // java.lang.VerifyError will be caused without "-noverify"
+      // TODO: investigate the cause and remove this
+      Array("-noverify") ++
+      options ++
+      List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++
+      arguments
     LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " +
       s"\n ${options.mkString(" ")}")
     val logger = new ProcessLogRedirector()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
new file mode 100644
index 0000000..c64d444
--- /dev/null
+++ b/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStore
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
new file mode 100644
index 0000000..0a22245
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.TestActorRef
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.master.Master
+import org.apache.gearpump.cluster.worker.Worker
+import org.apache.gearpump.util.Constants
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+class MiniCluster {
+  private val mockMasterIP = "127.0.0.1"
+
+  implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
+    withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
+
+  val (mockMaster, worker) = {
+    val master = system.actorOf(Props(classOf[Master]), "master")
+    val worker = system.actorOf(Props(classOf[Worker], master), "worker")
+
+    // Wait until worker register itself to master
+    waitUtilWorkerIsRegistered(master)
+    (master, worker)
+  }
+
+  def launchActor(props: Props): TestActorRef[Actor] = {
+    TestActorRef(props)
+  }
+
+  private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
+    while (!isWorkerRegistered(master)) {}
+  }
+
+  private def isWorkerRegistered(master: ActorRef): Boolean = {
+    import scala.concurrent.duration._
+    implicit val dispatcher = system.dispatcher
+
+    implicit val futureTimeout = Constants.FUTURE_TIMEOUT
+
+    val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
+
+    // Waits until the worker is registered.
+    val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
+    workers.workers.size > 0
+  }
+
+  def shutDown(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
new file mode 100644
index 0000000..f9b0762
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.{TestUtil, _}
+import org.apache.gearpump.util.LogUtil
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.util.Success
+
+class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+  var kvService: TestProbe = null
+  var haService: TestProbe = null
+  var appLauncher: TestProbe = null
+  var appManager: ActorRef = null
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    kvService = TestProbe()(getActorSystem)
+    appLauncher = TestProbe()(getActorSystem)
+
+    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+      new DummyAppMasterLauncherFactory(appLauncher))))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "AppManager" should "handle AppMaster message correctly" in {
+    val appMaster = TestProbe()(getActorSystem)
+    val appId = 1
+
+    val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
+    appMaster.send(appManager, register)
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    appMaster.send(appManager, ActivateAppMaster(appId))
+    appMaster.expectMsgType[AppMasterActivated]
+  }
+
+  "DataStoreService" should "support Put and Get" in {
+    val appMaster = TestProbe()(getActorSystem)
+    appMaster.send(appManager, SaveAppData(0, "key", 1))
+    kvService.expectMsgType[PutKV]
+    kvService.reply(PutKVSuccess)
+    appMaster.expectMsg(AppDataSaved)
+
+    appMaster.send(appManager, GetAppData(0, "key"))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess("key", 1))
+    appMaster.expectMsg(GetAppDataResult("key", 1))
+  }
+
+  "AppManager" should "support application submission and shutdown" in {
+    testClientSubmission(withRecover = false)
+  }
+
+  "AppManager" should "support application submission and recover if appmaster dies" in {
+    LOG.info("=================testing recover==============")
+    testClientSubmission(withRecover = true)
+  }
+
+  "AppManager" should "handle client message correctly" in {
+    val mockClient = TestProbe()(getActorSystem)
+    mockClient.send(appManager, ShutdownApplication(1))
+    assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
+
+    mockClient.send(appManager, ResolveAppId(1))
+    assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
+
+    mockClient.send(appManager, AppMasterDataRequest(1))
+    mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+  }
+
+  "AppManager" should "reject the application submission if the app name already existed" in {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, submit)
+    assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+  }
+
+  def testClientSubmission(withRecover: Boolean): Unit = {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    kvService.expectMsgType[PutKV]
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, ResolveAppId(appId))
+    client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
+
+    client.send(appManager, AppMastersDataRequest)
+    client.expectMsgType[AppMastersData]
+
+    client.send(appManager, AppMasterDataRequest(appId, false))
+    client.expectMsgType[AppMasterData]
+
+    if (!withRecover) {
+      client.send(appManager, ShutdownApplication(appId))
+      client.expectMsg(ShutdownApplicationResult(Success(appId)))
+    } else {
+      // Do recovery
+      getActorSystem.stop(appMaster.ref)
+      kvService.expectMsgType[GetKV]
+      val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
+      kvService.reply(GetKVSuccess(APP_STATE, appState))
+      appLauncher.expectMsg(LauncherStarted(appId))
+    }
+  }
+}
+
+class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
+  override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+    Props(new DummyAppMasterLauncher(test, appId))
+  }
+}
+
+class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
+  test.ref ! LauncherStarted(appId)
+  
+  override def receive: Receive = {
+    case any: Any => test.ref forward any
+  }
+}
+
+case class LauncherStarted(appId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
new file mode 100644
index 0000000..d3e739f
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.Props
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.InMemoryKVService
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+class InMemoryKVServiceSpec
+  extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.MASTER_CONFIG
+
+  "KVService" should "get, put, delete correctly" in {
+    val system = getActorSystem
+    val kvService = system.actorOf(Props(new InMemoryKVService()))
+    val group = "group"
+
+    val client = TestProbe()(system)
+
+    client.send(kvService, PutKV(group, "key", 1))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, PutKV(group, "key", 2))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, GetKV(group, "key"))
+    client.expectMsg(GetKVSuccess("key", 2))
+
+    client.send(kvService, DeleteKVGroup(group))
+
+    // After DeleteGroup, it no longer accept Get and Put message for this group.
+    client.send(kvService, GetKV(group, "key"))
+    client.expectNoMsg(3.seconds)
+
+    client.send(kvService, PutKV(group, "key", 3))
+    client.expectNoMsg(3.seconds)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
new file mode 100644
index 0000000..5f0d5e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import akka.util.Timeout
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.client.RunningApplicationSpec.{MockAskAppMasterRequest, MockAskAppMasterResponse}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContext.Implicits.global
+
+class RunningApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "RunningApplication" should "be able to shutdown application" in {
+    val errorMsg = "mock exception"
+    val master = TestProbe()
+    val timeout = Timeout(90, TimeUnit.SECONDS)
+    val application = new RunningApplication(1, master.ref, timeout)
+    Future {
+      application.shutDown()
+    }
+    master.expectMsg(ShutdownApplication(1))
+    master.reply(ShutdownApplicationResult(Success(1)))
+
+    val result = Future {
+      intercept[Exception] {
+        application.shutDown()
+      }
+    }
+    master.expectMsg(ShutdownApplication(1))
+    master.reply(ShutdownApplicationResult(Failure(new Exception(errorMsg))))
+    val exception = Await.result(result, Duration.Inf)
+    assert(exception.getMessage.equals(errorMsg))
+  }
+
+  "RunningApplication" should "be able to ask appmaster" in {
+    val master = TestProbe()
+    val appMaster = TestProbe()
+    val appId = 1
+    val timeout = Timeout(90, TimeUnit.SECONDS)
+    val request = MockAskAppMasterRequest("request")
+    val application = new RunningApplication(appId, master.ref, timeout)
+    val future = application.askAppMaster[MockAskAppMasterResponse](request)
+    master.expectMsg(ResolveAppId(appId))
+    master.reply(ResolveAppIdResult(Success(appMaster.ref)))
+    appMaster.expectMsg(MockAskAppMasterRequest("request"))
+    appMaster.reply(MockAskAppMasterResponse("response"))
+    val result = Await.result(future, Duration.Inf)
+    assert(result.res.equals("response"))
+
+    // ResolveAppId should not be called multiple times
+    val future2 = application.askAppMaster[MockAskAppMasterResponse](request)
+    appMaster.expectMsg(MockAskAppMasterRequest("request"))
+    appMaster.reply(MockAskAppMasterResponse("response"))
+    val result2 = Await.result(future2, Duration.Inf)
+    assert(result2.res.equals("response"))
+  }
+}
+
+object RunningApplicationSpec {
+  case class MockAskAppMasterRequest(req: String)
+
+  case class MockAskAppMasterResponse(res: String)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
new file mode 100644
index 0000000..2166976
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.Properties
+
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
+import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
+import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, LogUtil, Util}
+import org.scalatest._
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "Worker" should "register worker address to master when started." in {
+
+    val masterReceiver = createMockMaster()
+
+    val tempTestConf = convertTestConf(getHost, getPort)
+
+    val options = Array(
+      s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
+      s"-D${PREFER_IPV4}=true"
+    ) ++ getMasterListOption()
+
+    val worker = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Worker),
+      Array.empty)
+
+    try {
+      masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
+
+      tempTestConf.delete()
+    } finally {
+      worker.destroy()
+    }
+  }
+
+  "Master" should "accept worker RegisterNewWorker when started" in {
+    val worker = TestProbe()(getActorSystem)
+
+    val host = "127.0.0.1"
+    val port = Util.findFreePort().get
+
+    val properties = new Properties()
+    properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
+    properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
+    val masterConfig = ConfigFactory.parseProperties(properties)
+      .withFallback(TestUtil.MASTER_CONFIG)
+    Future {
+      Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
+    }
+
+    val masterProxy = getActorSystem.actorOf(
+      MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+
+    worker.send(masterProxy, RegisterNewWorker)
+    worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+  }
+
+  "Info" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
+    masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
+  }
+
+  "Kill" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Kill.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
+    masterReceiver.reply(ShutdownApplicationResult(Success(0)))
+  }
+
+  "Replay" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Replay.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
+    masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ReplayApplicationResult(Success(0)))
+  }
+
+  "Local" should "be started without exception" in {
+    val port = Util.findFreePort().get
+    val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
+      s"-D${PREFER_IPV4}=true")
+
+    val local = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Local),
+      Array.empty)
+
+    def retry(times: Int)(fn: => Boolean): Boolean = {
+
+      LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
+
+      val result = fn
+      if (result || times <= 0) {
+        result
+      } else {
+        Thread.sleep(1000)
+        retry(times - 1)(fn)
+      }
+    }
+
+    try {
+      assert(retry(10)(isPortUsed("127.0.0.1", port)),
+        "local is not started successfully, as port is not used " + port)
+    } finally {
+      local.destroy()
+    }
+  }
+
+  "Gear" should "support app|info|kill|shell|replay" in {
+
+    val commands = Array("app", "info", "kill", "shell", "replay")
+
+    assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
+
+    for (command <- commands) {
+      assert(Try(Gear.main(Array("-noexist"))).isFailure,
+        "pass unknown option, throw, command: " + command)
+    }
+
+    assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
+
+    val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
+    assert(tryThis.isFailure, "unknown command, throw")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
new file mode 100644
index 0000000..b48fc2a
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.TestUtil
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class MasterWatcherSpec extends FlatSpec with Matchers {
+  def config: Config = TestUtil.MASTER_CONFIG
+
+  "MasterWatcher" should "kill itself when can not get a quorum" in {
+    val system = ActorSystem("ForMasterWatcher", config)
+
+    val actorWatcher = TestProbe()(system)
+
+    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
+    actorWatcher watch masterWatcher
+    actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
new file mode 100644
index 0000000..8a3d7d1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+
+class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+  with WordSpecLike with Matchers with BeforeAndAfterAll{
+
+  def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
+  val appId = 0
+  val workerId1: WorkerId = WorkerId(1, 0L)
+  val workerId2: WorkerId = WorkerId(2, 0L)
+  val mockAppMaster = TestProbe()
+  val mockWorker1 = TestProbe()
+  val mockWorker2 = TestProbe()
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The scheduler" should {
+    "update resource only when the worker is registered" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
+      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
+        s"registered into master"))
+    }
+
+    "drop application's resource requests when the application is removed" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+      mockAppMaster.expectNoMsg(5.seconds)
+    }
+  }
+
+  def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
+    left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+  }
+
+  "The resource request with higher priority" should {
+    "be handled first" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
+      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The resource request which delivered earlier" should {
+    "be handled first if the priorities are the same" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different relaxation" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
+      val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      val request3 = ResourceRequest(
+        Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+
+      expect = ResourceAllocated(Array(
+        ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+        ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      // We have to manually update the resource on each worker
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different executor number" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      // By default, the request requires only one executor
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations2.allocations.length == 1)
+      assert(allocations2.allocations.head.resource == Resource(20))
+
+      val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations3.allocations.length == 3)
+      assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+      // The total available resource can not satisfy the requirements with executor number
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+      val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations4.allocations.length == 2)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+      // When new resources are available, the remaining request will be satisfied
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+      val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations5.allocations.length == 1)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
new file mode 100644
index 0000000..e0233f8
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import akka.actor.{ActorSystem, PoisonPill, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+import org.scalatest._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  val appId = 1
+  val workerId: WorkerId = WorkerId(1, 0L)
+  val executorId = 1
+  var masterProxy: TestProbe = null
+  var mockMaster: TestProbe = null
+  var client: TestProbe = null
+  val workerSlots = 50
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    mockMaster = TestProbe()(getActorSystem)
+    masterProxy = TestProbe()(getActorSystem)
+    client = TestProbe()(getActorSystem)
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "The new started worker" should {
+    "kill itself if no response from Master after registering" in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+      mockMaster.expectTerminated(worker, 60.seconds)
+    }
+  }
+
+  "Worker" should {
+    "init its resource from the gearpump config" in {
+      val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
+        withFallback(TestUtil.DEFAULT_CONFIG)
+      val workerSystem = ActorSystem("WorkerSystem", config)
+      val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
+
+      worker.tell(
+        UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
+      mockMaster.expectTerminated(worker, 5.seconds)
+      workerSystem.terminate()
+      Await.result(workerSystem.whenTerminated, Duration.Inf)
+    }
+  }
+
+  "Worker" should {
+    "update its remaining resource when launching and shutting down executors" in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
+      masterProxy.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+
+      val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
+      // This is an actor path which the ActorSystemBooter will report back to,
+      // not needed in this test
+      val reportBack = "dummy"
+      val executionContext = ExecutorJVMConfig(Array.empty[String],
+        getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
+        classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
+        username = "user")
+
+      // Test LaunchExecutor
+      worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
+        mockMaster.ref)
+      mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
+
+      worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
+
+      worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
+
+      // Test terminationWatch
+      worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+      client.expectMsg(ShutdownExecutorSucceed(1, 1))
+
+      worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
+      client.expectMsg(ShutdownExecutorFailed(
+        s"Can not find executor ${executorId + 1} for app $appId"))
+
+      mockMaster.ref ! PoisonPill
+      masterProxy.expectMsg(RegisterWorker(workerId))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
index c99a031..39b6261 100644
--- a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
@@ -22,9 +22,9 @@ import java.io.File
 import java.util.concurrent.TimeUnit
 
 import akka.actor.ActorSystem
-import com.typesafe.config.{ConfigValueFactory, ConfigValue}
+import com.google.common.io.Files
+import com.typesafe.config.ConfigValueFactory
 import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.google.common.io.Files
 import org.apache.gearpump.jarstore.local.LocalJarStore
 import org.apache.gearpump.util.{FileUtils, LogUtil}
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -39,7 +39,7 @@ class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
   val host = "localhost"
   private val LOG = LogUtil.getLogger(getClass)
 
-  var system: ActorSystem = null
+  var system: ActorSystem = _
 
   override def afterAll {
     if (null != system) {
@@ -75,7 +75,6 @@ class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
 
   "The file server" should {
     "serve the data previously stored" in {
-
       val rootDir = Files.createTempDir()
       val localJarStore: JarStore = new LocalJarStore
       val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
index 5881640..0855553 100644
--- a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.metrics
 
+import com.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
+
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
 
-import org.apache.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
 
 class MetricsSpec extends FlatSpec with Matchers with MockitoSugar {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
deleted file mode 100644
index fcf819b..0000000
--- a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.Message
-
-class PartitionerSpec extends FlatSpec with Matchers {
-  val NUM = 10
-
-  "HashPartitioner" should "hash same key to same slots" in {
-    val partitioner = new HashPartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" +
-      "consistent result")
-  }
-
-  "ShufflePartitioner" should "hash same key randomly" in {
-    val partitioner = new ShufflePartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" +
-      "consistent result")
-  }
-
-  "BroadcastPartitioner" should "return all partitions" in {
-    val partitioner = new BroadcastPartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-    val partitions = partitioner.getPartitions(msg, NUM)
-
-    partitions should contain theSameElementsAs 0.until(NUM)
-  }
-
-
-  "ShuffleGroupingPartitioner" should "hash same key randomly" in {
-    val partitioner = new ShuffleGroupingPartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" +
-      "consistent result")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
index f4bd114..0772a5e 100644
--- a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
@@ -18,20 +18,23 @@
 
 package org.apache.gearpump.serializer
 
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
 import akka.actor.{ActorSystem, ExtendedActorSystem}
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
 import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
 
 import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output}
-import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
 import org.apache.gearpump.serializer.SerializerSpec._
 
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+
 class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
   val config = ConfigFactory.empty.withValue("gearpump.serializers",
     ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName,
@@ -70,7 +73,7 @@ object SerializerSpec {
 
   class ClassASerializer extends KryoSerializer[ClassA] {
     override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {
-      output.writeString(classOf[ClassA].getName.toString)
+      output.writeString(classOf[ClassA].getName)
     }
 
     override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
index 66abc36..97b35ad 100644
--- a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
@@ -18,13 +18,13 @@
 
 package org.apache.gearpump.util
 
+import com.google.common.io.Files
+
 import java.io.File
 import java.util
 
 import org.scalatest.FlatSpec
 
-import org.apache.gearpump.google.common.io.Files
-
 class FileUtilsSpec extends FlatSpec {
   val TXT =
     """

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
deleted file mode 100644
index e173a8a..0000000
--- a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.gearpump.jarstore.local.LocalJarStore
-org.apache.gearpump.jarstore.dfs.DFSJarStore
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
deleted file mode 100644
index 9e55be6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.worker.WorkerId
-
-/**
- * Cluster Bootup Flow
- */
-object WorkerToMaster {
-
-  /** When an worker is started, it sends RegisterNewWorker */
-  case object RegisterNewWorker
-
-  /** When worker lose connection with master, it tries to register itself again with old Id. */
-  case class RegisterWorker(workerId: WorkerId)
-
-  /** Worker is responsible to broadcast its current status to master */
-  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
-}
-
-object MasterToWorker {
-
-  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
-  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
-
-  /** Worker have not received reply from master */
-  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
-
-  /** Master is synced with worker on resource slots managed by current worker */
-  case object UpdateResourceSucceed
-}
\ No newline at end of file