You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:44 UTC

[34/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
deleted file mode 100644
index 6eeba58..0000000
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class DistributedShellSpec
-  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
-  before {
-    startActorSystem()
-  }
-
-  after {
-    shutdownActorSystem()
-  }
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  property("DistributedShell should succeed to submit application with required arguments") {
-    val requiredArgs = Array.empty[String]
-
-    val masterReceiver = createMockMaster()
-
-    Future {
-      DistributedShell.main(masterConfig, requiredArgs)
-    }
-
-    masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
-    masterReceiver.reply(SubmitApplicationResult(Success(0)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
deleted file mode 100644
index d59981b..0000000
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-
-import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommandResult, ShellCommandResultAggregator}
-
-class ShellCommandResultAggregatorSpec extends WordSpec with Matchers with BeforeAndAfter {
-  "ShellCommandResultAggregator" should {
-    "aggregate ShellCommandResult" in {
-      val executorId1 = 1
-      val executorId2 = 2
-      val responseBuilder = new ShellCommandResultAggregator
-      val response1 = ShellCommandResult(executorId1, "task1")
-      val response2 = ShellCommandResult(executorId2, "task2")
-      val result = responseBuilder.aggregate(response1).aggregate(response2).toString()
-      val expected = s"Execute results from executor $executorId1 : \ntask1\n" +
-        s"Execute results from executor $executorId2 : \ntask2\n"
-      assert(result == expected)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
deleted file mode 100644
index b301973..0000000
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.sys.process._
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.cluster.appmaster.WorkerInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
-import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult}
-
-class ShellExecutorSpec extends WordSpec with Matchers {
-
-  "ShellExecutor" should {
-    "execute the shell command and return the result" in {
-      val executorId = 1
-      val workerId = WorkerId(2, 0L)
-      val appId = 0
-      val appName = "app"
-      val resource = Resource(1)
-      implicit val system = ActorSystem("ShellExecutor", TestUtil.DEFAULT_CONFIG)
-      val mockMaster = TestProbe()(system)
-      val worker = TestProbe()
-      val workerInfo = WorkerInfo(workerId, worker.ref)
-      val executorContext = ExecutorContext(executorId, workerInfo, appId, appName,
-        mockMaster.ref, resource)
-      val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext,
-        UserConfig.empty))
-
-      val process = Try(s"ls /".!!)
-      val result = process match {
-        case Success(msg) => msg
-        case Failure(ex) => ex.getMessage
-      }
-      executor.tell(ShellCommand("ls /"), mockMaster.ref)
-      assert(mockMaster.receiveN(1).head.asInstanceOf[ShellCommandResult].equals(
-        ShellCommandResult(executorId, result)))
-
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
new file mode 100644
index 0000000..e22abaf
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import akka.testkit.{TestActorRef, TestProbe}
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
+import org.apache.gearpump.util.ActorUtil
+
+class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
+  implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
+  val mockMaster = TestProbe()(system)
+  val mockWorker1 = TestProbe()(system)
+  val masterProxy = mockMaster.ref
+  val appId = 0
+  val userName = "test"
+  val masterExecutorId = 0
+  val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
+  val resource = Resource(1)
+  val appJar = None
+  val appDescription = AppDescription("app0", classOf[DistShellAppMaster].getName, UserConfig.empty)
+
+  "DistributedShell AppMaster" should {
+    "launch one ShellTask on each worker" in {
+      val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString)
+      val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar,
+        masterProxy, appMasterInfo)
+      TestActorRef[DistShellAppMaster](
+        AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
+      mockMaster.expectMsgType[RegisterAppMaster]
+      mockMaster.reply(AppMasterRegistered(appId))
+      // The DistributedShell AppMaster asks for worker list from Master.
+      mockMaster.expectMsg(GetAllWorkers)
+      mockMaster.reply(WorkerList(workerList))
+      // After worker list is ready, DistributedShell AppMaster requests resource on each worker
+      workerList.foreach { workerId =>
+        mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
+          relaxation = Relaxation.SPECIFICWORKER)))
+      }
+      mockMaster.reply(ResourceAllocated(
+        Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
+      mockWorker1.expectMsgClass(classOf[LaunchExecutor])
+      mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
+    }
+  }
+
+  after {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
new file mode 100644
index 0000000..7cfd07a
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.ResolveAppId
+import org.apache.gearpump.cluster.MasterToClient.ResolveAppIdResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand
+import org.apache.gearpump.util.LogUtil
+
+class DistributedShellClientSpec
+  extends PropSpec with Matchers with BeforeAndAfter with MasterHarness {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  protected override def config = TestUtil.DEFAULT_CONFIG
+
+  property("DistributedShellClient should succeed to submit application with required arguments") {
+    val command = "ls /"
+    val requiredArgs = Array("-appid", "0", "-command", command)
+    val masterReceiver = createMockMaster()
+
+    assert(Try(DistributedShellClient.main(Array.empty[String])).isFailure,
+      "missing required arguments, print usage")
+
+    Future {
+      DistributedShellClient.main(masterConfig, requiredArgs)
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ResolveAppId(0))
+    val mockAppMaster = TestProbe()(getActorSystem)
+    masterReceiver.reply(ResolveAppIdResult(Success(mockAppMaster.ref)))
+    LOG.info(s"Reply back ResolveAppIdResult, current actorRef: ${mockAppMaster.ref.path.toString}")
+    mockAppMaster.expectMsg(PROCESS_BOOT_TIME, ShellCommand(command))
+    mockAppMaster.reply("result")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala
new file mode 100644
index 0000000..51b5ec3
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class DistributedShellSpec
+  extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+  before {
+    startActorSystem()
+  }
+
+  after {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  property("DistributedShell should succeed to submit application with required arguments") {
+    val requiredArgs = Array.empty[String]
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      DistributedShell.main(masterConfig, requiredArgs)
+    }
+
+    masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+    masterReceiver.reply(SubmitApplicationResult(Success(0)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
new file mode 100644
index 0000000..11350a6
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommandResult, ShellCommandResultAggregator}
+
+class ShellCommandResultAggregatorSpec extends WordSpec with Matchers with BeforeAndAfter {
+  "ShellCommandResultAggregator" should {
+    "aggregate ShellCommandResult" in {
+      val executorId1 = 1
+      val executorId2 = 2
+      val responseBuilder = new ShellCommandResultAggregator
+      val response1 = ShellCommandResult(executorId1, "task1")
+      val response2 = ShellCommandResult(executorId2, "task2")
+      val result = responseBuilder.aggregate(response1).aggregate(response2).toString()
+      val expected = s"Execute results from executor $executorId1 : \ntask1\n" +
+        s"Execute results from executor $executorId2 : \ntask2\n"
+      assert(result == expected)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala
new file mode 100644
index 0000000..e7a3a21
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
+import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult}
+
+class ShellExecutorSpec extends WordSpec with Matchers {
+
+  "ShellExecutor" should {
+    "execute the shell command and return the result" in {
+      val executorId = 1
+      val workerId = WorkerId(2, 0L)
+      val appId = 0
+      val appName = "app"
+      val resource = Resource(1)
+      implicit val system = ActorSystem("ShellExecutor", TestUtil.DEFAULT_CONFIG)
+      val mockMaster = TestProbe()(system)
+      val worker = TestProbe()
+      val workerInfo = WorkerInfo(workerId, worker.ref)
+      val executorContext = ExecutorContext(executorId, workerInfo, appId, appName,
+        mockMaster.ref, resource)
+      val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext,
+        UserConfig.empty))
+
+      val process = Try(s"ls /".!!)
+      val result = process match {
+        case Success(msg) => msg
+        case Failure(ex) => ex.getMessage
+      }
+      executor.tell(ShellCommand("ls /"), mockMaster.ref)
+      assert(mockMaster.receiveN(1).head.asInstanceOf[ShellCommandResult].equals(
+        ShellCommandResult(executorId, result)))
+
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/README.md
----------------------------------------------------------------------
diff --git a/examples/distributeservice/README.md b/examples/distributeservice/README.md
index 82b3726..65d41f6 100644
--- a/examples/distributeservice/README.md
+++ b/examples/distributeservice/README.md
@@ -6,12 +6,12 @@ In order to run the example:
 
   2. Start the AppMaster:<br>
   ```bash
-  target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar io.gearpump.distributeservice.DistributeService
+  target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar org.apache.gearpump.distributeservice.DistributeService
   ```
   3. Distribute the service:<br>
   ```bash
   target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar
-  io.gearpump.distributeservice.DistributeServiceClient -appid $APPID -file ${File_Path}
+  org.apache.gearpump.distributeservice.DistributeServiceClient -appid $APPID -file ${File_Path}
   -script ${Script_Path} -serviceName ${Service_Name} -target ${Target_Path} -Dkey1=value1 -Dkey2=value2
   ```<br>
   This command will distribute the service zip file(variable ```file```) to the target path(variable ```target```), then copy the script to

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
deleted file mode 100644
index a220dc6..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import java.io.File
-import scala.concurrent.Future
-
-import akka.actor.{Deploy, Props}
-import akka.pattern.{ask, pipe}
-import akka.remote.RemoteScope
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster.ShutdownApplication
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout}
-import io.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
-import io.gearpump.util._
-
-class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription)
-  extends ApplicationMaster {
-  import appContext._
-  import context.dispatcher
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-  private var currentExecutorId = 0
-  private var fileServerPort = -1
-
-  val rootDirectory = new File("/")
-  val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
-  val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host, 0))
-
-  override def preStart(): Unit = {
-    LOG.info(s"Distribute Service AppMaster started")
-    ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self)
-  }
-
-  (server ? FileServer.GetPort).asInstanceOf[Future[FileServer.Port]] pipeTo self
-
-  override def receive: Receive = {
-    case ExecutorSystemStarted(executorSystem, _) =>
-      import executorSystem.{address, resource => executorResource, worker}
-      val executorContext = ExecutorContext(currentExecutorId, worker,
-        appId, app.name, self, executorResource)
-      // start executor
-      val executor = context.actorOf(Props(classOf[DistServiceExecutor],
-        executorContext, app.userConfig).withDeploy(
-        Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
-      executorSystem.bindLifeCycleWith(executor)
-      currentExecutorId += 1
-    case StartExecutorSystemTimeout =>
-      LOG.error(s"Failed to allocate resource in time")
-      masterProxy ! ShutdownApplication(appId)
-      context.stop(self)
-    case FileServer.Port(port) =>
-      this.fileServerPort = port
-    case GetFileContainer =>
-      val name = Math.abs(new java.util.Random().nextLong()).toString
-      sender ! new FileContainer(s"http://$host:$fileServerPort/$name")
-    case installService: InstallService =>
-      context.children.foreach(_ ! installService)
-  }
-
-  private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
-    val config: Config = app.clusterConfig
-    val jvmSetting = Util.resolveJvmSetting(
-      config.withFallback(context.system.settings.config)).executor
-    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
-      appJar, username, config)
-  }
-}
-
-object DistServiceAppMaster {
-  case object GetFileContainer
-
-  case class FileContainer(url: String)
-
-  case class InstallService(
-      url: String,
-      zipFileName: String,
-      targetPath: String,
-      script: Array[Byte],
-      serviceName: String,
-      serviceSettings: Map[String, Any])
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
deleted file mode 100644
index 4a2a876..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import java.io.{File, FileWriter}
-import java.net.InetAddress
-import scala.collection.JavaConverters._
-import scala.io.Source
-import scala.sys.process._
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.Actor
-import org.apache.commons.io.FileUtils
-import org.apache.commons.lang.text.StrSubstitutor
-import org.slf4j.Logger
-
-import io.gearpump.cluster.{ExecutorContext, UserConfig}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.InstallService
-import io.gearpump.util.{ActorUtil, LogUtil}
-
-class DistServiceExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor {
-  import executorContext._
-  private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
-
-  override def receive: Receive = {
-    case InstallService(url, zipFileName, targetPath, scriptData, serviceName, serviceSettings) =>
-      LOG.info(s"Executor $executorId receive command to install " +
-        s"service $serviceName to $targetPath")
-      unzipFile(url, zipFileName, targetPath)
-      installService(scriptData, serviceName, serviceSettings)
-  }
-
-  private def unzipFile(url: String, zipFileName: String, targetPath: String) = {
-    val zipFile = File.createTempFile(System.currentTimeMillis().toString, zipFileName)
-    val dir = new File(targetPath)
-    if (dir.exists()) {
-      FileUtils.forceDelete(dir)
-    }
-    val bytes = FileServer.newClient.get(url).get
-    FileUtils.writeByteArrayToFile(zipFile, bytes)
-    val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath".!!)
-    result match {
-      case Success(msg) => LOG.info(s"Executor $executorId unzip file to $targetPath")
-      case Failure(ex) => throw ex
-    }
-  }
-
-  private def installService(
-      scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = {
-    val tempFile = File.createTempFile("gearpump", serviceName)
-    FileUtils.writeByteArrayToFile(tempFile, scriptData)
-    val script = new File("/etc/init.d", serviceName)
-    writeFileWithEnvVariables(tempFile, script, serviceSettings ++ getEnvSettings)
-    val result = Try(s"chkconfig --add $serviceName".!!)
-    result match {
-      case Success(msg) => LOG.info(s"Executor install service $serviceName successfully!")
-      case Failure(ex) => throw ex
-    }
-  }
-
-  private def getEnvSettings: Map[String, Any] = {
-    Map("workerId" -> worker,
-      "localhost" -> ActorUtil.getSystemAddress(context.system).host.get,
-      "hostname" -> InetAddress.getLocalHost.getHostName)
-  }
-
-  private def writeFileWithEnvVariables(source: File, target: File, envs: Map[String, Any]) = {
-    val writer = new FileWriter(target)
-    val sub = new StrSubstitutor(envs.asJava)
-    sub.setEnableSubstitutionInVariables(true)
-    Source.fromFile(source).getLines().foreach(line => writer.write(sub.replace(line) + "\r\n"))
-    writer.close()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
deleted file mode 100644
index 522dc5e..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.cluster.{Application, UserConfig}
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Demo app to remotely deploy and start system service on machines in the cluster */
-object DistributeService extends AkkaApp with ArgumentsParser {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array.empty
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    LOG.info(s"Distribute Service submitting application...")
-    val context = ClientContext(akkaConf)
-    val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
-      UserConfig.empty))
-    context.close()
-    LOG.info(s"Distribute Service Application started with appId $appId !")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
deleted file mode 100644
index 0d85001..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import java.io.File
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.pattern.ask
-import org.apache.commons.io.FileUtils
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
-import io.gearpump.util.{AkkaApp, Constants}
-
-/** Client to submit the service jar */
-object DistributeServiceClient extends AkkaApp with ArgumentsParser {
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true),
-    "file" -> CLIOption[String]("<service zip file path>", required = true),
-    "script" -> CLIOption[String](
-      "<file path of service script that will be installed to /etc/init.d>", required = true),
-    "serviceName" -> CLIOption[String]("<service name>", required = true),
-    "target" -> CLIOption[String]("<target path on each machine>", required = true)
-  )
-
-  override def help(): Unit = {
-    super.help()
-    // scalastyle:off println
-    Console.err.println(s"-D<name>=<value> set a property to the service")
-    // scalastyle:on println
-  }
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(filterCustomOptions(args))
-    val context = ClientContext(akkaConf)
-    implicit val system = context.system
-    implicit val dispatcher = system.dispatcher
-    val appid = config.getInt("appid")
-    val zipFile = new File(config.getString("file"))
-    val script = new File(config.getString("script"))
-    val serviceName = config.getString("serviceName")
-    val appMaster = context.resolveAppID(appid)
-    (appMaster ? GetFileContainer).asInstanceOf[Future[FileContainer]].map { container =>
-      val bytes = FileUtils.readFileToByteArray(zipFile)
-      val result = FileServer.newClient.save(container.url, bytes)
-      result match {
-        case Success(_) =>
-          appMaster ! InstallService(container.url, zipFile.getName, config.getString("target"),
-            FileUtils.readFileToByteArray(script), serviceName, parseServiceConfig(args))
-          context.close()
-        case Failure(ex) => throw ex
-      }
-    }
-  }
-
-  private def filterCustomOptions(args: Array[String]): Array[String] = {
-    args.filter(!_.startsWith("-D"))
-  }
-
-  private def parseServiceConfig(args: Array[String]): Map[String, Any] = {
-    val result = Map.empty[String, Any]
-    args.foldLeft(result) { (result, argument) =>
-      if (argument.startsWith("-D") && argument.contains("=")) {
-        val fixedKV = argument.substring(2).split("=")
-        result + (fixedKV(0) -> fixedKV(1))
-      } else {
-        result
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
deleted file mode 100644
index ed0b24d..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.distributeservice
-
-import java.io.File
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.{Actor, Stash}
-import akka.io.IO
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.{ByteArrayRequestEntity, GetMethod, PostMethod}
-import spray.can.Http
-import spray.http.HttpMethods._
-import spray.http._
-
-import io.gearpump.util.{FileUtils, LogUtil}
-
-/**
- *
- * Should not use this to server too big files(more than 100MB), otherwise OOM may happen.
- *
- * port: set port to 0 if you want to bind to random port
- */
-class FileServer(rootDir: File, host: String, port: Int) extends Actor with Stash {
-  private val LOG = LogUtil.getLogger(getClass)
-
-  implicit val system = context.system
-
-  override def preStart(): Unit = {
-    // Creates http server
-    IO(Http) ! Http.Bind(self, host, port)
-  }
-
-  override def postStop(): Unit = {
-    // Stop the server
-    IO(Http) ! Http.Unbind
-  }
-
-  override def receive: Receive = {
-    case Http.Bound(address) =>
-      LOG.info(s"FileServer bound on port: ${address.getPort}")
-      context.become(listen(address.getPort))
-      unstashAll()
-    case _ =>
-      stash()
-  }
-
-  def listen(port: Int): Receive = {
-    case FileServer.GetPort => {
-      sender ! FileServer.Port(port)
-    }
-    case Http.Connected(remote, _) =>
-      sender ! Http.Register(self)
-
-    // Fetches files from remote uri
-    case HttpRequest(GET, uri, _, _, _) =>
-      val child = uri.path.toString()
-      val payload = Try {
-        val source = new File(rootDir, child)
-        FileUtils.readFileToByteArray(source)
-      }
-      payload match {
-        case Success(data) =>
-          sender ! HttpResponse(entity = HttpEntity(data))
-        case Failure(ex) =>
-          LOG.error("failed to get file " + ex.getMessage)
-          sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
-      }
-    // Save file to remote uri
-    case post@HttpRequest(POST, uri, _, _, _) =>
-      val child = uri.path.toString()
-
-      val status = Try {
-        val target = new File(rootDir, child)
-        val payload = post.entity.data.toByteArray
-        FileUtils.writeByteArrayToFile(target, payload)
-        "OK"
-      }
-      status match {
-        case Success(message) => sender ! HttpResponse(entity = message)
-        case Failure(ex) =>
-          LOG.error("save file failed " + ex.getMessage)
-          sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
-      }
-  }
-}
-
-object FileServer {
-  object GetPort
-  case class Port(port: Int)
-
-  def newClient: Client = new Client
-
-  class Client {
-    val client = new HttpClient()
-
-    def save(uri: String, data: Array[Byte]): Try[Int] = {
-      Try {
-        val post = new PostMethod(uri)
-        val entity = new ByteArrayRequestEntity(data)
-        post.setRequestEntity(entity)
-        client.executeMethod(post)
-      }
-    }
-
-    def get(uri: String): Try[Array[Byte]] = {
-      val get = new GetMethod(uri)
-      val status = Try {
-        client.executeMethod(get)
-      }
-
-      val data = status.flatMap { code =>
-        if (code == 200) {
-          Success(get.getResponseBody())
-        } else {
-          Failure(new Exception(s"We cannot get the data, the status code is $code"))
-        }
-      }
-      data
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
new file mode 100644
index 0000000..ca0ab49
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.File
+import scala.concurrent.Future
+
+import akka.actor.{Deploy, Props}
+import akka.pattern.{ask, pipe}
+import akka.remote.RemoteScope
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout}
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
+import org.apache.gearpump.util._
+
+class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription)
+  extends ApplicationMaster {
+  import appContext._
+  import context.dispatcher
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+  private var currentExecutorId = 0
+  private var fileServerPort = -1
+
+  val rootDirectory = new File("/")
+  val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
+  val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host, 0))
+
+  override def preStart(): Unit = {
+    LOG.info(s"Distribute Service AppMaster started")
+    ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self)
+  }
+
+  (server ? FileServer.GetPort).asInstanceOf[Future[FileServer.Port]] pipeTo self
+
+  override def receive: Receive = {
+    case ExecutorSystemStarted(executorSystem, _) =>
+      import executorSystem.{address, resource => executorResource, worker}
+      val executorContext = ExecutorContext(currentExecutorId, worker,
+        appId, app.name, self, executorResource)
+      // start executor
+      val executor = context.actorOf(Props(classOf[DistServiceExecutor],
+        executorContext, app.userConfig).withDeploy(
+        Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
+      executorSystem.bindLifeCycleWith(executor)
+      currentExecutorId += 1
+    case StartExecutorSystemTimeout =>
+      LOG.error(s"Failed to allocate resource in time")
+      masterProxy ! ShutdownApplication(appId)
+      context.stop(self)
+    case FileServer.Port(port) =>
+      this.fileServerPort = port
+    case GetFileContainer =>
+      val name = Math.abs(new java.util.Random().nextLong()).toString
+      sender ! new FileContainer(s"http://$host:$fileServerPort/$name")
+    case installService: InstallService =>
+      context.children.foreach(_ ! installService)
+  }
+
+  private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
+    val config: Config = app.clusterConfig
+    val jvmSetting = Util.resolveJvmSetting(
+      config.withFallback(context.system.settings.config)).executor
+    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
+      appJar, username, config)
+  }
+}
+
+object DistServiceAppMaster {
+  case object GetFileContainer
+
+  case class FileContainer(url: String)
+
+  case class InstallService(
+      url: String,
+      zipFileName: String,
+      targetPath: String,
+      script: Array[Byte],
+      serviceName: String,
+      serviceSettings: Map[String, Any])
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala
new file mode 100644
index 0000000..248156f
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.{File, FileWriter}
+import java.net.InetAddress
+import scala.collection.JavaConverters._
+import scala.io.Source
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.Actor
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang.text.StrSubstitutor
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.{ExecutorContext, UserConfig}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.InstallService
+import org.apache.gearpump.util.{ActorUtil, LogUtil}
+
+class DistServiceExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor {
+  import executorContext._
+  private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
+
+  override def receive: Receive = {
+    case InstallService(url, zipFileName, targetPath, scriptData, serviceName, serviceSettings) =>
+      LOG.info(s"Executor $executorId receive command to install " +
+        s"service $serviceName to $targetPath")
+      unzipFile(url, zipFileName, targetPath)
+      installService(scriptData, serviceName, serviceSettings)
+  }
+
+  private def unzipFile(url: String, zipFileName: String, targetPath: String) = {
+    val zipFile = File.createTempFile(System.currentTimeMillis().toString, zipFileName)
+    val dir = new File(targetPath)
+    if (dir.exists()) {
+      FileUtils.forceDelete(dir)
+    }
+    val bytes = FileServer.newClient.get(url).get
+    FileUtils.writeByteArrayToFile(zipFile, bytes)
+    val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath".!!)
+    result match {
+      case Success(msg) => LOG.info(s"Executor $executorId unzip file to $targetPath")
+      case Failure(ex) => throw ex
+    }
+  }
+
+  private def installService(
+      scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = {
+    val tempFile = File.createTempFile("gearpump", serviceName)
+    FileUtils.writeByteArrayToFile(tempFile, scriptData)
+    val script = new File("/etc/init.d", serviceName)
+    writeFileWithEnvVariables(tempFile, script, serviceSettings ++ getEnvSettings)
+    val result = Try(s"chkconfig --add $serviceName".!!)
+    result match {
+      case Success(msg) => LOG.info(s"Executor install service $serviceName successfully!")
+      case Failure(ex) => throw ex
+    }
+  }
+
+  private def getEnvSettings: Map[String, Any] = {
+    Map("workerId" -> worker,
+      "localhost" -> ActorUtil.getSystemAddress(context.system).host.get,
+      "hostname" -> InetAddress.getLocalHost.getHostName)
+  }
+
+  private def writeFileWithEnvVariables(source: File, target: File, envs: Map[String, Any]) = {
+    val writer = new FileWriter(target)
+    val sub = new StrSubstitutor(envs.asJava)
+    sub.setEnableSubstitutionInVariables(true)
+    Source.fromFile(source).getLines().foreach(line => writer.write(sub.replace(line) + "\r\n"))
+    writer.close()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
new file mode 100644
index 0000000..df7a517
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.cluster.{Application, UserConfig}
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Demo app to remotely deploy and start system service on machines in the cluster */
+object DistributeService extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    LOG.info(s"Distribute Service submitting application...")
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
+      UserConfig.empty))
+    context.close()
+    LOG.info(s"Distribute Service Application started with appId $appId !")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala
new file mode 100644
index 0000000..b2c8f11
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.File
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.pattern.ask
+import org.apache.commons.io.FileUtils
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
+import org.apache.gearpump.util.{AkkaApp, Constants}
+
+/** Client to submit the service jar */
+object DistributeServiceClient extends AkkaApp with ArgumentsParser {
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true),
+    "file" -> CLIOption[String]("<service zip file path>", required = true),
+    "script" -> CLIOption[String](
+      "<file path of service script that will be installed to /etc/init.d>", required = true),
+    "serviceName" -> CLIOption[String]("<service name>", required = true),
+    "target" -> CLIOption[String]("<target path on each machine>", required = true)
+  )
+
+  override def help(): Unit = {
+    super.help()
+    // scalastyle:off println
+    Console.err.println(s"-D<name>=<value> set a property to the service")
+    // scalastyle:on println
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(filterCustomOptions(args))
+    val context = ClientContext(akkaConf)
+    implicit val system = context.system
+    implicit val dispatcher = system.dispatcher
+    val appid = config.getInt("appid")
+    val zipFile = new File(config.getString("file"))
+    val script = new File(config.getString("script"))
+    val serviceName = config.getString("serviceName")
+    val appMaster = context.resolveAppID(appid)
+    (appMaster ? GetFileContainer).asInstanceOf[Future[FileContainer]].map { container =>
+      val bytes = FileUtils.readFileToByteArray(zipFile)
+      val result = FileServer.newClient.save(container.url, bytes)
+      result match {
+        case Success(_) =>
+          appMaster ! InstallService(container.url, zipFile.getName, config.getString("target"),
+            FileUtils.readFileToByteArray(script), serviceName, parseServiceConfig(args))
+          context.close()
+        case Failure(ex) => throw ex
+      }
+    }
+  }
+
+  private def filterCustomOptions(args: Array[String]): Array[String] = {
+    args.filter(!_.startsWith("-D"))
+  }
+
+  private def parseServiceConfig(args: Array[String]): Map[String, Any] = {
+    val result = Map.empty[String, Any]
+    args.foldLeft(result) { (result, argument) =>
+      if (argument.startsWith("-D") && argument.contains("=")) {
+        val fixedKV = argument.substring(2).split("=")
+        result + (fixedKV(0) -> fixedKV(1))
+      } else {
+        result
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala
new file mode 100644
index 0000000..4cd71de
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.File
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.{Actor, Stash}
+import akka.io.IO
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.{ByteArrayRequestEntity, GetMethod, PostMethod}
+import spray.can.Http
+import spray.http.HttpMethods._
+import spray.http._
+
+import org.apache.gearpump.util.{FileUtils, LogUtil}
+
+/**
+ *
+ * Should not use this to server too big files(more than 100MB), otherwise OOM may happen.
+ *
+ * port: set port to 0 if you want to bind to random port
+ */
+class FileServer(rootDir: File, host: String, port: Int) extends Actor with Stash {
+  private val LOG = LogUtil.getLogger(getClass)
+
+  implicit val system = context.system
+
+  override def preStart(): Unit = {
+    // Creates http server
+    IO(Http) ! Http.Bind(self, host, port)
+  }
+
+  override def postStop(): Unit = {
+    // Stop the server
+    IO(Http) ! Http.Unbind
+  }
+
+  override def receive: Receive = {
+    case Http.Bound(address) =>
+      LOG.info(s"FileServer bound on port: ${address.getPort}")
+      context.become(listen(address.getPort))
+      unstashAll()
+    case _ =>
+      stash()
+  }
+
+  def listen(port: Int): Receive = {
+    case FileServer.GetPort => {
+      sender ! FileServer.Port(port)
+    }
+    case Http.Connected(remote, _) =>
+      sender ! Http.Register(self)
+
+    // Fetches files from remote uri
+    case HttpRequest(GET, uri, _, _, _) =>
+      val child = uri.path.toString()
+      val payload = Try {
+        val source = new File(rootDir, child)
+        FileUtils.readFileToByteArray(source)
+      }
+      payload match {
+        case Success(data) =>
+          sender ! HttpResponse(entity = HttpEntity(data))
+        case Failure(ex) =>
+          LOG.error("failed to get file " + ex.getMessage)
+          sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
+      }
+    // Save file to remote uri
+    case post@HttpRequest(POST, uri, _, _, _) =>
+      val child = uri.path.toString()
+
+      val status = Try {
+        val target = new File(rootDir, child)
+        val payload = post.entity.data.toByteArray
+        FileUtils.writeByteArrayToFile(target, payload)
+        "OK"
+      }
+      status match {
+        case Success(message) => sender ! HttpResponse(entity = message)
+        case Failure(ex) =>
+          LOG.error("save file failed " + ex.getMessage)
+          sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
+      }
+  }
+}
+
+object FileServer {
+  object GetPort
+  case class Port(port: Int)
+
+  def newClient: Client = new Client
+
+  class Client {
+    val client = new HttpClient()
+
+    def save(uri: String, data: Array[Byte]): Try[Int] = {
+      Try {
+        val post = new PostMethod(uri)
+        val entity = new ByteArrayRequestEntity(data)
+        post.setRequestEntity(entity)
+        client.executeMethod(post)
+      }
+    }
+
+    def get(uri: String): Try[Array[Byte]] = {
+      val get = new GetMethod(uri)
+      val status = Try {
+        client.executeMethod(get)
+      }
+
+      val data = status.flatMap { code =>
+        if (code == 200) {
+          Success(get.getResponseBody())
+        } else {
+          Failure(new Exception(s"We cannot get the data, the status code is $code"))
+        }
+      }
+      data
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
deleted file mode 100644
index 5bafef1..0000000
--- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.testkit.{TestActorRef, TestProbe}
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-
-import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
-import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer}
-import io.gearpump.util.ActorSystemBooter.RegisterActorSystem
-import io.gearpump.util.ActorUtil
-
-class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
-  implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
-  val mockMaster = TestProbe()(system)
-  val mockWorker1 = TestProbe()(system)
-  val client = TestProbe()(system)
-  val masterProxy = mockMaster.ref
-  val appId = 0
-  val userName = "test"
-  val masterExecutorId = 0
-  val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
-  val resource = Resource(1)
-  val appJar = None
-  val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName,
-    UserConfig.empty)
-
-  "DistService AppMaster" should {
-    "responsable for service distributing" in {
-      val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref)
-      val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy,
-        appMasterInfo)
-      TestActorRef[DistServiceAppMaster](
-        AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
-      val registerAppMaster = mockMaster.receiveOne(15.seconds)
-      assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
-
-      val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
-      mockMaster.reply(AppMasterRegistered(appId))
-      // The DistributedShell AppMaster will ask for worker list
-      mockMaster.expectMsg(GetAllWorkers)
-      mockMaster.reply(WorkerList(workerList))
-      // After worker list is ready, DistributedShell AppMaster will request resouce on each worker
-      workerList.foreach { workerId =>
-        mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
-          relaxation = Relaxation.SPECIFICWORKER)))
-      }
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref,
-        WorkerId(1, 0L)))))
-      mockWorker1.expectMsgClass(classOf[LaunchExecutor])
-      mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
-
-      appMaster.tell(GetFileContainer, client.ref)
-      client.expectMsgClass(15.seconds, classOf[FileContainer])
-    }
-  }
-
-  after {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
new file mode 100644
index 0000000..7516138
--- /dev/null
+++ b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.testkit.{TestActorRef, TestProbe}
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer}
+import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
+import org.apache.gearpump.util.ActorUtil
+
+class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
+  implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
+  val mockMaster = TestProbe()(system)
+  val mockWorker1 = TestProbe()(system)
+  val client = TestProbe()(system)
+  val masterProxy = mockMaster.ref
+  val appId = 0
+  val userName = "test"
+  val masterExecutorId = 0
+  val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
+  val resource = Resource(1)
+  val appJar = None
+  val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName,
+    UserConfig.empty)
+
+  "DistService AppMaster" should {
+    "responsable for service distributing" in {
+      val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref)
+      val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy,
+        appMasterInfo)
+      TestActorRef[DistServiceAppMaster](
+        AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
+      val registerAppMaster = mockMaster.receiveOne(15.seconds)
+      assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
+
+      val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
+      mockMaster.reply(AppMasterRegistered(appId))
+      // The DistributedShell AppMaster will ask for worker list
+      mockMaster.expectMsg(GetAllWorkers)
+      mockMaster.reply(WorkerList(workerList))
+      // After worker list is ready, DistributedShell AppMaster will request resouce on each worker
+      workerList.foreach { workerId =>
+        mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
+          relaxation = Relaxation.SPECIFICWORKER)))
+      }
+      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref,
+        WorkerId(1, 0L)))))
+      mockWorker1.expectMsgClass(classOf[LaunchExecutor])
+      mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
+
+      appMaster.tell(GetFileContainer, client.ref)
+      client.expectMsgClass(15.seconds, classOf[FileContainer])
+    }
+  }
+
+  after {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/README.md
----------------------------------------------------------------------
diff --git a/examples/pagerank/README.md b/examples/pagerank/README.md
index 20d9032..7fe9220 100644
--- a/examples/pagerank/README.md
+++ b/examples/pagerank/README.md
@@ -2,7 +2,7 @@
 
 After compile,
 ```scala
-bin\gear io.gearpump.experiments.pagerank.example.PageRankExample
+bin\gear org.apache.gearpump.experiments.pagerank.example.PageRankExample
 ```
 
 ### Syntax

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/resources/geardefault.conf b/examples/pagerank/src/main/resources/geardefault.conf
index d2a4e16..67275f9 100644
--- a/examples/pagerank/src/main/resources/geardefault.conf
+++ b/examples/pagerank/src/main/resources/geardefault.conf
@@ -1,7 +1,7 @@
 gearpump {
   serializers {
-    "io.gearpump.experiments.pagerank.PageRankController$Tick" = ""
-    "io.gearpump.experiments.pagerank.PageRankWorker$UpdateWeight" = ""
-    "io.gearpump.experiments.pagerank.PageRankWorker$LatestWeight" = ""
+    "org.apache.gearpump.experiments.pagerank.PageRankController$Tick" = ""
+    "org.apache.gearpump.experiments.pagerank.PageRankWorker$UpdateWeight" = ""
+    "org.apache.gearpump.experiments.pagerank.PageRankWorker$LatestWeight" = ""
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
deleted file mode 100644
index 2e37091..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank
-
-import akka.actor.ActorSystem
-
-import io.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
-import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph.Node
-
-/**
- *
- * A simple and naive pagerank implementation.
- *
- * @param name name of the application
- * @param iteration max iteration count
- * @param delta decide the accuracy when the page rank example stops.
- * @param dag  the page rank graph
- */
-class PageRankApplication[T](
-    override val name: String, iteration: Int, delta: Double, dag: Graph[T, _])
-  extends Application {
-
-  override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
-  override def userConfig(implicit system: ActorSystem): UserConfig = {
-
-    // Map node with taskId
-    var taskId = 0
-    val pageRankDag = dag.mapVertex { node =>
-      val updatedNode = NodeWithTaskId(taskId, node)
-      taskId += 1
-      updatedNode
-    }
-
-    val taskCount = taskId
-
-    val userConfig = UserConfig.empty.withValue(PageRankApplication.DAG, pageRankDag).
-      withInt(PageRankApplication.ITERATION, iteration).
-      withInt(PageRankApplication.COUNT, taskCount).
-      withDouble(PageRankApplication.DELTA, delta)
-
-    val controller = Processor[PageRankController](1)
-    val pageRankWorker = Processor[PageRankWorker](taskCount)
-    val partitioner = new HashPartitioner
-
-    val app = StreamApplication(name, Graph(controller ~ partitioner ~> pageRankWorker), userConfig)
-    app.userConfig
-  }
-}
-
-object PageRankApplication {
-  val DAG = "PageRank.DAG"
-  val ITERATION = "PageRank.Iteration"
-  val COUNT = "PageRank.COUNT"
-  val DELTA = "PageRank.DELTA"
-  val REPORTER = "PageRank.Reporter"
-  case class NodeWithTaskId[T](taskId: Int, node: T)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
deleted file mode 100644
index 0fb689d..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank
-
-import akka.actor.Actor.Receive
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.pagerank.PageRankController.Tick
-import io.gearpump.experiments.pagerank.PageRankWorker.LatestWeight
-import io.gearpump.streaming.task._
-
-class PageRankController(taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-
-  val taskCount = conf.getInt(PageRankApplication.COUNT).get
-  val iterationMax = conf.getInt(PageRankApplication.ITERATION).get
-  val delta = conf.getDouble(PageRankApplication.DELTA).get
-
-  val tasks = (0 until taskCount).toList.map(TaskId(1, _))
-
-  var tick: Int = 0
-  var receivedWeightForCurrentTick = 0
-
-  var weights = Map.empty[TaskId, Double]
-  var deltas = Map.empty[TaskId, Double]
-
-  override def onStart(startTime: StartTime): Unit = {
-    output(Tick(tick), tasks: _*)
-  }
-
-  private def output(msg: AnyRef, tasks: TaskId*): Unit = {
-    taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case LatestWeight(taskId, weight, replyTick) =>
-      if (this.tick == replyTick) {
-
-        deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
-        weights += taskId -> weight
-        receivedWeightForCurrentTick += 1
-        if (receivedWeightForCurrentTick == taskCount) {
-          this.tick += 1
-          receivedWeightForCurrentTick = 0
-          if (continueIteration) {
-            LOG.debug(s"next iteration: $tick, weight: $weights, delta: $deltas")
-            output(Tick(tick), tasks: _*)
-          } else {
-            LOG.info(s"iterations: $tick, weight: $weights, delta: $deltas")
-          }
-        }
-      }
-  }
-
-  private def continueIteration: Boolean = {
-    (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) =>
-      deltaExceed || value > delta
-    }
-  }
-}
-
-object PageRankController {
-  case class Tick(iteration: Int)
-}