You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:44 UTC
[34/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
deleted file mode 100644
index 6eeba58..0000000
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistributedShellSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import com.typesafe.config.Config
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class DistributedShellSpec
- extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
-
- before {
- startActorSystem()
- }
-
- after {
- shutdownActorSystem()
- }
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- property("DistributedShell should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
-
- val masterReceiver = createMockMaster()
-
- Future {
- DistributedShell.main(masterConfig, requiredArgs)
- }
-
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
deleted file mode 100644
index d59981b..0000000
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-
-import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommandResult, ShellCommandResultAggregator}
-
-class ShellCommandResultAggregatorSpec extends WordSpec with Matchers with BeforeAndAfter {
- "ShellCommandResultAggregator" should {
- "aggregate ShellCommandResult" in {
- val executorId1 = 1
- val executorId2 = 2
- val responseBuilder = new ShellCommandResultAggregator
- val response1 = ShellCommandResult(executorId1, "task1")
- val response2 = ShellCommandResult(executorId2, "task2")
- val result = responseBuilder.aggregate(response1).aggregate(response2).toString()
- val expected = s"Execute results from executor $executorId1 : \ntask1\n" +
- s"Execute results from executor $executorId2 : \ntask2\n"
- assert(result == expected)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
deleted file mode 100644
index b301973..0000000
--- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.examples.distributedshell
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.sys.process._
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.cluster.appmaster.WorkerInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
-import io.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult}
-
-class ShellExecutorSpec extends WordSpec with Matchers {
-
- "ShellExecutor" should {
- "execute the shell command and return the result" in {
- val executorId = 1
- val workerId = WorkerId(2, 0L)
- val appId = 0
- val appName = "app"
- val resource = Resource(1)
- implicit val system = ActorSystem("ShellExecutor", TestUtil.DEFAULT_CONFIG)
- val mockMaster = TestProbe()(system)
- val worker = TestProbe()
- val workerInfo = WorkerInfo(workerId, worker.ref)
- val executorContext = ExecutorContext(executorId, workerInfo, appId, appName,
- mockMaster.ref, resource)
- val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext,
- UserConfig.empty))
-
- val process = Try(s"ls /".!!)
- val result = process match {
- case Success(msg) => msg
- case Failure(ex) => ex.getMessage
- }
- executor.tell(ShellCommand("ls /"), mockMaster.ref)
- assert(mockMaster.receiveN(1).head.asInstanceOf[ShellCommandResult].equals(
- ShellCommandResult(executorId, result)))
-
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
new file mode 100644
index 0000000..e22abaf
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import akka.testkit.{TestActorRef, TestProbe}
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
+import org.apache.gearpump.util.ActorUtil
+
+class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
+ implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
+ val mockMaster = TestProbe()(system)
+ val mockWorker1 = TestProbe()(system)
+ val masterProxy = mockMaster.ref
+ val appId = 0
+ val userName = "test"
+ val masterExecutorId = 0
+ val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
+ val resource = Resource(1)
+ val appJar = None
+ val appDescription = AppDescription("app0", classOf[DistShellAppMaster].getName, UserConfig.empty)
+
+ "DistributedShell AppMaster" should {
+ "launch one ShellTask on each worker" in {
+ val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString)
+ val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar,
+ masterProxy, appMasterInfo)
+ TestActorRef[DistShellAppMaster](
+ AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
+ mockMaster.expectMsgType[RegisterAppMaster]
+ mockMaster.reply(AppMasterRegistered(appId))
+ // The DistributedShell AppMaster asks for worker list from Master.
+ mockMaster.expectMsg(GetAllWorkers)
+ mockMaster.reply(WorkerList(workerList))
+ // After worker list is ready, DistributedShell AppMaster requests resource on each worker
+ workerList.foreach { workerId =>
+ mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
+ relaxation = Relaxation.SPECIFICWORKER)))
+ }
+ mockMaster.reply(ResourceAllocated(
+ Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
+ mockWorker1.expectMsgClass(classOf[LaunchExecutor])
+ mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
+ }
+ }
+
+ after {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
new file mode 100644
index 0000000..7cfd07a
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellClientSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.ResolveAppId
+import org.apache.gearpump.cluster.MasterToClient.ResolveAppIdResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommand
+import org.apache.gearpump.util.LogUtil
+
+class DistributedShellClientSpec
+ extends PropSpec with Matchers with BeforeAndAfter with MasterHarness {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ before {
+ startActorSystem()
+ }
+
+ after {
+ shutdownActorSystem()
+ }
+
+ protected override def config = TestUtil.DEFAULT_CONFIG
+
+ property("DistributedShellClient should succeed to submit application with required arguments") {
+ val command = "ls /"
+ val requiredArgs = Array("-appid", "0", "-command", command)
+ val masterReceiver = createMockMaster()
+
+ assert(Try(DistributedShellClient.main(Array.empty[String])).isFailure,
+ "missing required arguments, print usage")
+
+ Future {
+ DistributedShellClient.main(masterConfig, requiredArgs)
+ }
+
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, ResolveAppId(0))
+ val mockAppMaster = TestProbe()(getActorSystem)
+ masterReceiver.reply(ResolveAppIdResult(Success(mockAppMaster.ref)))
+ LOG.info(s"Reply back ResolveAppIdResult, current actorRef: ${mockAppMaster.ref.path.toString}")
+ mockAppMaster.expectMsg(PROCESS_BOOT_TIME, ShellCommand(command))
+ mockAppMaster.reply("result")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala
new file mode 100644
index 0000000..51b5ec3
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistributedShellSpec.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+
+class DistributedShellSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+
+ before {
+ startActorSystem()
+ }
+
+ after {
+ shutdownActorSystem()
+ }
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ property("DistributedShell should succeed to submit application with required arguments") {
+ val requiredArgs = Array.empty[String]
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ DistributedShell.main(masterConfig, requiredArgs)
+ }
+
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
new file mode 100644
index 0000000..11350a6
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellCommandResultAggregatorSpec.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommandResult, ShellCommandResultAggregator}
+
+class ShellCommandResultAggregatorSpec extends WordSpec with Matchers with BeforeAndAfter {
+ "ShellCommandResultAggregator" should {
+ "aggregate ShellCommandResult" in {
+ val executorId1 = 1
+ val executorId2 = 2
+ val responseBuilder = new ShellCommandResultAggregator
+ val response1 = ShellCommandResult(executorId1, "task1")
+ val response2 = ShellCommandResult(executorId2, "task2")
+ val result = responseBuilder.aggregate(response1).aggregate(response2).toString()
+ val expected = s"Execute results from executor $executorId1 : \ntask1\n" +
+ s"Execute results from executor $executorId2 : \ntask2\n"
+ assert(result == expected)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala
new file mode 100644
index 0000000..e7a3a21
--- /dev/null
+++ b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/ShellExecutorSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.examples.distributedshell
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.scalatest.{Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
+import org.apache.gearpump.examples.distributedshell.DistShellAppMaster.{ShellCommand, ShellCommandResult}
+
+class ShellExecutorSpec extends WordSpec with Matchers {
+
+ "ShellExecutor" should {
+ "execute the shell command and return the result" in {
+ val executorId = 1
+ val workerId = WorkerId(2, 0L)
+ val appId = 0
+ val appName = "app"
+ val resource = Resource(1)
+ implicit val system = ActorSystem("ShellExecutor", TestUtil.DEFAULT_CONFIG)
+ val mockMaster = TestProbe()(system)
+ val worker = TestProbe()
+ val workerInfo = WorkerInfo(workerId, worker.ref)
+ val executorContext = ExecutorContext(executorId, workerInfo, appId, appName,
+ mockMaster.ref, resource)
+ val executor = system.actorOf(Props(classOf[ShellExecutor], executorContext,
+ UserConfig.empty))
+
+ val process = Try(s"ls /".!!)
+ val result = process match {
+ case Success(msg) => msg
+ case Failure(ex) => ex.getMessage
+ }
+ executor.tell(ShellCommand("ls /"), mockMaster.ref)
+ assert(mockMaster.receiveN(1).head.asInstanceOf[ShellCommandResult].equals(
+ ShellCommandResult(executorId, result)))
+
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/README.md
----------------------------------------------------------------------
diff --git a/examples/distributeservice/README.md b/examples/distributeservice/README.md
index 82b3726..65d41f6 100644
--- a/examples/distributeservice/README.md
+++ b/examples/distributeservice/README.md
@@ -6,12 +6,12 @@ In order to run the example:
2. Start the AppMaster:<br>
```bash
- target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar io.gearpump.distributeservice.DistributeService
+ target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar org.apache.gearpump.distributeservice.DistributeService
```
3. Distribute the service:<br>
```bash
target/pack/bin/gear app -jar experiments/distributeservice/target/$SCALA_VERSION_MAJOR/gearpump-experiments-distributeservice_$VERSION.jar
- io.gearpump.distributeservice.DistributeServiceClient -appid $APPID -file ${File_Path}
+ org.apache.gearpump.distributeservice.DistributeServiceClient -appid $APPID -file ${File_Path}
-script ${Script_Path} -serviceName ${Service_Name} -target ${Target_Path} -Dkey1=value1 -Dkey2=value2
```<br>
This command will distribute the service zip file(variable ```file```) to the target path(variable ```target```), then copy the script to
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
deleted file mode 100644
index a220dc6..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import java.io.File
-import scala.concurrent.Future
-
-import akka.actor.{Deploy, Props}
-import akka.pattern.{ask, pipe}
-import akka.remote.RemoteScope
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster.ShutdownApplication
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout}
-import io.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
-import io.gearpump.util._
-
-class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription)
- extends ApplicationMaster {
- import appContext._
- import context.dispatcher
- implicit val timeout = Constants.FUTURE_TIMEOUT
- private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
- private var currentExecutorId = 0
- private var fileServerPort = -1
-
- val rootDirectory = new File("/")
- val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
- val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host, 0))
-
- override def preStart(): Unit = {
- LOG.info(s"Distribute Service AppMaster started")
- ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self)
- }
-
- (server ? FileServer.GetPort).asInstanceOf[Future[FileServer.Port]] pipeTo self
-
- override def receive: Receive = {
- case ExecutorSystemStarted(executorSystem, _) =>
- import executorSystem.{address, resource => executorResource, worker}
- val executorContext = ExecutorContext(currentExecutorId, worker,
- appId, app.name, self, executorResource)
- // start executor
- val executor = context.actorOf(Props(classOf[DistServiceExecutor],
- executorContext, app.userConfig).withDeploy(
- Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
- executorSystem.bindLifeCycleWith(executor)
- currentExecutorId += 1
- case StartExecutorSystemTimeout =>
- LOG.error(s"Failed to allocate resource in time")
- masterProxy ! ShutdownApplication(appId)
- context.stop(self)
- case FileServer.Port(port) =>
- this.fileServerPort = port
- case GetFileContainer =>
- val name = Math.abs(new java.util.Random().nextLong()).toString
- sender ! new FileContainer(s"http://$host:$fileServerPort/$name")
- case installService: InstallService =>
- context.children.foreach(_ ! installService)
- }
-
- private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
- val config: Config = app.clusterConfig
- val jvmSetting = Util.resolveJvmSetting(
- config.withFallback(context.system.settings.config)).executor
- ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
- appJar, username, config)
- }
-}
-
-object DistServiceAppMaster {
- case object GetFileContainer
-
- case class FileContainer(url: String)
-
- case class InstallService(
- url: String,
- zipFileName: String,
- targetPath: String,
- script: Array[Byte],
- serviceName: String,
- serviceSettings: Map[String, Any])
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
deleted file mode 100644
index 4a2a876..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistServiceExecutor.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import java.io.{File, FileWriter}
-import java.net.InetAddress
-import scala.collection.JavaConverters._
-import scala.io.Source
-import scala.sys.process._
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.Actor
-import org.apache.commons.io.FileUtils
-import org.apache.commons.lang.text.StrSubstitutor
-import org.slf4j.Logger
-
-import io.gearpump.cluster.{ExecutorContext, UserConfig}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.InstallService
-import io.gearpump.util.{ActorUtil, LogUtil}
-
-class DistServiceExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor {
- import executorContext._
- private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
-
- override def receive: Receive = {
- case InstallService(url, zipFileName, targetPath, scriptData, serviceName, serviceSettings) =>
- LOG.info(s"Executor $executorId receive command to install " +
- s"service $serviceName to $targetPath")
- unzipFile(url, zipFileName, targetPath)
- installService(scriptData, serviceName, serviceSettings)
- }
-
- private def unzipFile(url: String, zipFileName: String, targetPath: String) = {
- val zipFile = File.createTempFile(System.currentTimeMillis().toString, zipFileName)
- val dir = new File(targetPath)
- if (dir.exists()) {
- FileUtils.forceDelete(dir)
- }
- val bytes = FileServer.newClient.get(url).get
- FileUtils.writeByteArrayToFile(zipFile, bytes)
- val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath".!!)
- result match {
- case Success(msg) => LOG.info(s"Executor $executorId unzip file to $targetPath")
- case Failure(ex) => throw ex
- }
- }
-
- private def installService(
- scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = {
- val tempFile = File.createTempFile("gearpump", serviceName)
- FileUtils.writeByteArrayToFile(tempFile, scriptData)
- val script = new File("/etc/init.d", serviceName)
- writeFileWithEnvVariables(tempFile, script, serviceSettings ++ getEnvSettings)
- val result = Try(s"chkconfig --add $serviceName".!!)
- result match {
- case Success(msg) => LOG.info(s"Executor install service $serviceName successfully!")
- case Failure(ex) => throw ex
- }
- }
-
- private def getEnvSettings: Map[String, Any] = {
- Map("workerId" -> worker,
- "localhost" -> ActorUtil.getSystemAddress(context.system).host.get,
- "hostname" -> InetAddress.getLocalHost.getHostName)
- }
-
- private def writeFileWithEnvVariables(source: File, target: File, envs: Map[String, Any]) = {
- val writer = new FileWriter(target)
- val sub = new StrSubstitutor(envs.asJava)
- sub.setEnableSubstitutionInVariables(true)
- Source.fromFile(source).getLines().foreach(line => writer.write(sub.replace(line) + "\r\n"))
- writer.close()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
deleted file mode 100644
index 522dc5e..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeService.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.cluster.{Application, UserConfig}
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Demo app to remotely deploy and start system service on machines in the cluster */
-object DistributeService extends AkkaApp with ArgumentsParser {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array.empty
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- LOG.info(s"Distribute Service submitting application...")
- val context = ClientContext(akkaConf)
- val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
- UserConfig.empty))
- context.close()
- LOG.info(s"Distribute Service Application started with appId $appId !")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
deleted file mode 100644
index 0d85001..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/DistributeServiceClient.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import java.io.File
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.pattern.ask
-import org.apache.commons.io.FileUtils
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
-import io.gearpump.util.{AkkaApp, Constants}
-
-/** Client to submit the service jar */
-object DistributeServiceClient extends AkkaApp with ArgumentsParser {
- implicit val timeout = Constants.FUTURE_TIMEOUT
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true),
- "file" -> CLIOption[String]("<service zip file path>", required = true),
- "script" -> CLIOption[String](
- "<file path of service script that will be installed to /etc/init.d>", required = true),
- "serviceName" -> CLIOption[String]("<service name>", required = true),
- "target" -> CLIOption[String]("<target path on each machine>", required = true)
- )
-
- override def help(): Unit = {
- super.help()
- // scalastyle:off println
- Console.err.println(s"-D<name>=<value> set a property to the service")
- // scalastyle:on println
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(filterCustomOptions(args))
- val context = ClientContext(akkaConf)
- implicit val system = context.system
- implicit val dispatcher = system.dispatcher
- val appid = config.getInt("appid")
- val zipFile = new File(config.getString("file"))
- val script = new File(config.getString("script"))
- val serviceName = config.getString("serviceName")
- val appMaster = context.resolveAppID(appid)
- (appMaster ? GetFileContainer).asInstanceOf[Future[FileContainer]].map { container =>
- val bytes = FileUtils.readFileToByteArray(zipFile)
- val result = FileServer.newClient.save(container.url, bytes)
- result match {
- case Success(_) =>
- appMaster ! InstallService(container.url, zipFile.getName, config.getString("target"),
- FileUtils.readFileToByteArray(script), serviceName, parseServiceConfig(args))
- context.close()
- case Failure(ex) => throw ex
- }
- }
- }
-
- private def filterCustomOptions(args: Array[String]): Array[String] = {
- args.filter(!_.startsWith("-D"))
- }
-
- private def parseServiceConfig(args: Array[String]): Map[String, Any] = {
- val result = Map.empty[String, Any]
- args.foldLeft(result) { (result, argument) =>
- if (argument.startsWith("-D") && argument.contains("=")) {
- val fixedKV = argument.substring(2).split("=")
- result + (fixedKV(0) -> fixedKV(1))
- } else {
- result
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala b/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
deleted file mode 100644
index ed0b24d..0000000
--- a/examples/distributeservice/src/main/scala/io/gearpump/experiments/distributeservice/FileServer.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.distributeservice
-
-import java.io.File
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.{Actor, Stash}
-import akka.io.IO
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.{ByteArrayRequestEntity, GetMethod, PostMethod}
-import spray.can.Http
-import spray.http.HttpMethods._
-import spray.http._
-
-import io.gearpump.util.{FileUtils, LogUtil}
-
-/**
- *
- * Should not use this to server too big files(more than 100MB), otherwise OOM may happen.
- *
- * port: set port to 0 if you want to bind to random port
- */
-class FileServer(rootDir: File, host: String, port: Int) extends Actor with Stash {
- private val LOG = LogUtil.getLogger(getClass)
-
- implicit val system = context.system
-
- override def preStart(): Unit = {
- // Creates http server
- IO(Http) ! Http.Bind(self, host, port)
- }
-
- override def postStop(): Unit = {
- // Stop the server
- IO(Http) ! Http.Unbind
- }
-
- override def receive: Receive = {
- case Http.Bound(address) =>
- LOG.info(s"FileServer bound on port: ${address.getPort}")
- context.become(listen(address.getPort))
- unstashAll()
- case _ =>
- stash()
- }
-
- def listen(port: Int): Receive = {
- case FileServer.GetPort => {
- sender ! FileServer.Port(port)
- }
- case Http.Connected(remote, _) =>
- sender ! Http.Register(self)
-
- // Fetches files from remote uri
- case HttpRequest(GET, uri, _, _, _) =>
- val child = uri.path.toString()
- val payload = Try {
- val source = new File(rootDir, child)
- FileUtils.readFileToByteArray(source)
- }
- payload match {
- case Success(data) =>
- sender ! HttpResponse(entity = HttpEntity(data))
- case Failure(ex) =>
- LOG.error("failed to get file " + ex.getMessage)
- sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
- }
- // Save file to remote uri
- case post@HttpRequest(POST, uri, _, _, _) =>
- val child = uri.path.toString()
-
- val status = Try {
- val target = new File(rootDir, child)
- val payload = post.entity.data.toByteArray
- FileUtils.writeByteArrayToFile(target, payload)
- "OK"
- }
- status match {
- case Success(message) => sender ! HttpResponse(entity = message)
- case Failure(ex) =>
- LOG.error("save file failed " + ex.getMessage)
- sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
- }
- }
-}
-
-object FileServer {
- object GetPort
- case class Port(port: Int)
-
- def newClient: Client = new Client
-
- class Client {
- val client = new HttpClient()
-
- def save(uri: String, data: Array[Byte]): Try[Int] = {
- Try {
- val post = new PostMethod(uri)
- val entity = new ByteArrayRequestEntity(data)
- post.setRequestEntity(entity)
- client.executeMethod(post)
- }
- }
-
- def get(uri: String): Try[Array[Byte]] = {
- val get = new GetMethod(uri)
- val status = Try {
- client.executeMethod(get)
- }
-
- val data = status.flatMap { code =>
- if (code == 200) {
- Success(get.getResponseBody())
- } else {
- Failure(new Exception(s"We cannot get the data, the status code is $code"))
- }
- }
- data
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
new file mode 100644
index 0000000..ca0ab49
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMaster.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.File
+import scala.concurrent.Future
+
+import akka.actor.{Deploy, Props}
+import akka.pattern.{ask, pipe}
+import akka.remote.RemoteScope
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.ShutdownApplication
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout}
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, ApplicationMaster, ExecutorContext}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
+import org.apache.gearpump.util._
+
+class DistServiceAppMaster(appContext: AppMasterContext, app: AppDescription)
+ extends ApplicationMaster {
+ import appContext._
+ import context.dispatcher
+ implicit val timeout = Constants.FUTURE_TIMEOUT
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+ private var currentExecutorId = 0
+ private var fileServerPort = -1
+
+ val rootDirectory = new File("/")
+ val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
+ val server = context.actorOf(Props(classOf[FileServer], rootDirectory, host, 0))
+
+ override def preStart(): Unit = {
+ LOG.info(s"Distribute Service AppMaster started")
+ ActorUtil.launchExecutorOnEachWorker(masterProxy, getExecutorJvmConfig, self)
+ }
+
+ (server ? FileServer.GetPort).asInstanceOf[Future[FileServer.Port]] pipeTo self
+
+ override def receive: Receive = {
+ case ExecutorSystemStarted(executorSystem, _) =>
+ import executorSystem.{address, resource => executorResource, worker}
+ val executorContext = ExecutorContext(currentExecutorId, worker,
+ appId, app.name, self, executorResource)
+ // start executor
+ val executor = context.actorOf(Props(classOf[DistServiceExecutor],
+ executorContext, app.userConfig).withDeploy(
+ Deploy(scope = RemoteScope(address))), currentExecutorId.toString)
+ executorSystem.bindLifeCycleWith(executor)
+ currentExecutorId += 1
+ case StartExecutorSystemTimeout =>
+ LOG.error(s"Failed to allocate resource in time")
+ masterProxy ! ShutdownApplication(appId)
+ context.stop(self)
+ case FileServer.Port(port) =>
+ this.fileServerPort = port
+ case GetFileContainer =>
+ val name = Math.abs(new java.util.Random().nextLong()).toString
+ sender ! new FileContainer(s"http://$host:$fileServerPort/$name")
+ case installService: InstallService =>
+ context.children.foreach(_ ! installService)
+ }
+
+ private def getExecutorJvmConfig: ExecutorSystemJvmConfig = {
+ val config: Config = app.clusterConfig
+ val jvmSetting = Util.resolveJvmSetting(
+ config.withFallback(context.system.settings.config)).executor
+ ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs,
+ appJar, username, config)
+ }
+}
+
+object DistServiceAppMaster {
+ case object GetFileContainer
+
+ case class FileContainer(url: String)
+
+ case class InstallService(
+ url: String,
+ zipFileName: String,
+ targetPath: String,
+ script: Array[Byte],
+ serviceName: String,
+ serviceSettings: Map[String, Any])
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala
new file mode 100644
index 0000000..248156f
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistServiceExecutor.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.{File, FileWriter}
+import java.net.InetAddress
+import scala.collection.JavaConverters._
+import scala.io.Source
+import scala.sys.process._
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.Actor
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang.text.StrSubstitutor
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.{ExecutorContext, UserConfig}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.InstallService
+import org.apache.gearpump.util.{ActorUtil, LogUtil}
+
+class DistServiceExecutor(executorContext: ExecutorContext, userConf: UserConfig) extends Actor {
+ import executorContext._
+ private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
+
+ override def receive: Receive = {
+ case InstallService(url, zipFileName, targetPath, scriptData, serviceName, serviceSettings) =>
+ LOG.info(s"Executor $executorId receive command to install " +
+ s"service $serviceName to $targetPath")
+ unzipFile(url, zipFileName, targetPath)
+ installService(scriptData, serviceName, serviceSettings)
+ }
+
+ private def unzipFile(url: String, zipFileName: String, targetPath: String) = {
+ val zipFile = File.createTempFile(System.currentTimeMillis().toString, zipFileName)
+ val dir = new File(targetPath)
+ if (dir.exists()) {
+ FileUtils.forceDelete(dir)
+ }
+ val bytes = FileServer.newClient.get(url).get
+ FileUtils.writeByteArrayToFile(zipFile, bytes)
+ val result = Try(s"unzip ${zipFile.getAbsolutePath} -d $targetPath".!!)
+ result match {
+ case Success(msg) => LOG.info(s"Executor $executorId unzip file to $targetPath")
+ case Failure(ex) => throw ex
+ }
+ }
+
+ private def installService(
+ scriptData: Array[Byte], serviceName: String, serviceSettings: Map[String, Any]) = {
+ val tempFile = File.createTempFile("gearpump", serviceName)
+ FileUtils.writeByteArrayToFile(tempFile, scriptData)
+ val script = new File("/etc/init.d", serviceName)
+ writeFileWithEnvVariables(tempFile, script, serviceSettings ++ getEnvSettings)
+ val result = Try(s"chkconfig --add $serviceName".!!)
+ result match {
+ case Success(msg) => LOG.info(s"Executor install service $serviceName successfully!")
+ case Failure(ex) => throw ex
+ }
+ }
+
+ private def getEnvSettings: Map[String, Any] = {
+ Map("workerId" -> worker,
+ "localhost" -> ActorUtil.getSystemAddress(context.system).host.get,
+ "hostname" -> InetAddress.getLocalHost.getHostName)
+ }
+
+ private def writeFileWithEnvVariables(source: File, target: File, envs: Map[String, Any]) = {
+ val writer = new FileWriter(target)
+ val sub = new StrSubstitutor(envs.asJava)
+ sub.setEnableSubstitutionInVariables(true)
+ Source.fromFile(source).getLines().foreach(line => writer.write(sub.replace(line) + "\r\n"))
+ writer.close()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
new file mode 100644
index 0000000..df7a517
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.cluster.{Application, UserConfig}
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Demo app to remotely deploy and start system service on machines in the cluster */
+object DistributeService extends AkkaApp with ArgumentsParser {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ LOG.info(s"Distribute Service submitting application...")
+ val context = ClientContext(akkaConf)
+ val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
+ UserConfig.empty))
+ context.close()
+ LOG.info(s"Distribute Service Application started with appId $appId !")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala
new file mode 100644
index 0000000..b2c8f11
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeServiceClient.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.File
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.pattern.ask
+import org.apache.commons.io.FileUtils
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer, InstallService}
+import org.apache.gearpump.util.{AkkaApp, Constants}
+
+/** Client to submit the service jar */
+object DistributeServiceClient extends AkkaApp with ArgumentsParser {
+ implicit val timeout = Constants.FUTURE_TIMEOUT
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "appid" -> CLIOption[Int]("<the distributed shell appid>", required = true),
+ "file" -> CLIOption[String]("<service zip file path>", required = true),
+ "script" -> CLIOption[String](
+ "<file path of service script that will be installed to /etc/init.d>", required = true),
+ "serviceName" -> CLIOption[String]("<service name>", required = true),
+ "target" -> CLIOption[String]("<target path on each machine>", required = true)
+ )
+
+ override def help(): Unit = {
+ super.help()
+ // scalastyle:off println
+ Console.err.println(s"-D<name>=<value> set a property to the service")
+ // scalastyle:on println
+ }
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(filterCustomOptions(args))
+ val context = ClientContext(akkaConf)
+ implicit val system = context.system
+ implicit val dispatcher = system.dispatcher
+ val appid = config.getInt("appid")
+ val zipFile = new File(config.getString("file"))
+ val script = new File(config.getString("script"))
+ val serviceName = config.getString("serviceName")
+ val appMaster = context.resolveAppID(appid)
+ (appMaster ? GetFileContainer).asInstanceOf[Future[FileContainer]].map { container =>
+ val bytes = FileUtils.readFileToByteArray(zipFile)
+ val result = FileServer.newClient.save(container.url, bytes)
+ result match {
+ case Success(_) =>
+ appMaster ! InstallService(container.url, zipFile.getName, config.getString("target"),
+ FileUtils.readFileToByteArray(script), serviceName, parseServiceConfig(args))
+ context.close()
+ case Failure(ex) => throw ex
+ }
+ }
+ }
+
+ private def filterCustomOptions(args: Array[String]): Array[String] = {
+ args.filter(!_.startsWith("-D"))
+ }
+
+ private def parseServiceConfig(args: Array[String]): Map[String, Any] = {
+ val result = Map.empty[String, Any]
+ args.foldLeft(result) { (result, argument) =>
+ if (argument.startsWith("-D") && argument.contains("=")) {
+ val fixedKV = argument.substring(2).split("=")
+ result + (fixedKV(0) -> fixedKV(1))
+ } else {
+ result
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala
new file mode 100644
index 0000000..4cd71de
--- /dev/null
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/FileServer.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.distributeservice
+
+import java.io.File
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.{Actor, Stash}
+import akka.io.IO
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.{ByteArrayRequestEntity, GetMethod, PostMethod}
+import spray.can.Http
+import spray.http.HttpMethods._
+import spray.http._
+
+import org.apache.gearpump.util.{FileUtils, LogUtil}
+
+/**
+ *
+ * Should not use this to server too big files(more than 100MB), otherwise OOM may happen.
+ *
+ * port: set port to 0 if you want to bind to random port
+ */
+class FileServer(rootDir: File, host: String, port: Int) extends Actor with Stash {
+ private val LOG = LogUtil.getLogger(getClass)
+
+ implicit val system = context.system
+
+ override def preStart(): Unit = {
+ // Creates http server
+ IO(Http) ! Http.Bind(self, host, port)
+ }
+
+ override def postStop(): Unit = {
+ // Stop the server
+ IO(Http) ! Http.Unbind
+ }
+
+ override def receive: Receive = {
+ case Http.Bound(address) =>
+ LOG.info(s"FileServer bound on port: ${address.getPort}")
+ context.become(listen(address.getPort))
+ unstashAll()
+ case _ =>
+ stash()
+ }
+
+ def listen(port: Int): Receive = {
+ case FileServer.GetPort => {
+ sender ! FileServer.Port(port)
+ }
+ case Http.Connected(remote, _) =>
+ sender ! Http.Register(self)
+
+ // Fetches files from remote uri
+ case HttpRequest(GET, uri, _, _, _) =>
+ val child = uri.path.toString()
+ val payload = Try {
+ val source = new File(rootDir, child)
+ FileUtils.readFileToByteArray(source)
+ }
+ payload match {
+ case Success(data) =>
+ sender ! HttpResponse(entity = HttpEntity(data))
+ case Failure(ex) =>
+ LOG.error("failed to get file " + ex.getMessage)
+ sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
+ }
+ // Save file to remote uri
+ case post@HttpRequest(POST, uri, _, _, _) =>
+ val child = uri.path.toString()
+
+ val status = Try {
+ val target = new File(rootDir, child)
+ val payload = post.entity.data.toByteArray
+ FileUtils.writeByteArrayToFile(target, payload)
+ "OK"
+ }
+ status match {
+ case Success(message) => sender ! HttpResponse(entity = message)
+ case Failure(ex) =>
+ LOG.error("save file failed " + ex.getMessage)
+ sender ! HttpResponse(status = StatusCodes.InternalServerError, entity = ex.getMessage)
+ }
+ }
+}
+
+object FileServer {
+ object GetPort
+ case class Port(port: Int)
+
+ def newClient: Client = new Client
+
+ class Client {
+ val client = new HttpClient()
+
+ def save(uri: String, data: Array[Byte]): Try[Int] = {
+ Try {
+ val post = new PostMethod(uri)
+ val entity = new ByteArrayRequestEntity(data)
+ post.setRequestEntity(entity)
+ client.executeMethod(post)
+ }
+ }
+
+ def get(uri: String): Try[Array[Byte]] = {
+ val get = new GetMethod(uri)
+ val status = Try {
+ client.executeMethod(get)
+ }
+
+ val data = status.flatMap { code =>
+ if (code == 200) {
+ Success(get.getResponseBody())
+ } else {
+ Failure(new Exception(s"We cannot get the data, the status code is $code"))
+ }
+ }
+ data
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
deleted file mode 100644
index 5bafef1..0000000
--- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.distributeservice
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.testkit.{TestActorRef, TestProbe}
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-
-import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
-import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig}
-import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer}
-import io.gearpump.util.ActorSystemBooter.RegisterActorSystem
-import io.gearpump.util.ActorUtil
-
-class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
- implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
- val mockMaster = TestProbe()(system)
- val mockWorker1 = TestProbe()(system)
- val client = TestProbe()(system)
- val masterProxy = mockMaster.ref
- val appId = 0
- val userName = "test"
- val masterExecutorId = 0
- val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
- val resource = Resource(1)
- val appJar = None
- val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName,
- UserConfig.empty)
-
- "DistService AppMaster" should {
- "responsable for service distributing" in {
- val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref)
- val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy,
- appMasterInfo)
- TestActorRef[DistServiceAppMaster](
- AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
- val registerAppMaster = mockMaster.receiveOne(15.seconds)
- assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
-
- val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
- mockMaster.reply(AppMasterRegistered(appId))
- // The DistributedShell AppMaster will ask for worker list
- mockMaster.expectMsg(GetAllWorkers)
- mockMaster.reply(WorkerList(workerList))
- // After worker list is ready, DistributedShell AppMaster will request resouce on each worker
- workerList.foreach { workerId =>
- mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
- relaxation = Relaxation.SPECIFICWORKER)))
- }
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref,
- WorkerId(1, 0L)))))
- mockWorker1.expectMsgClass(classOf[LaunchExecutor])
- mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
-
- appMaster.tell(GetFileContainer, client.ref)
- client.expectMsgClass(15.seconds, classOf[FileContainer])
- }
- }
-
- after {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
new file mode 100644
index 0000000..7516138
--- /dev/null
+++ b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.distributeservice
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.testkit.{TestActorRef, TestProbe}
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig}
+import org.apache.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer}
+import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
+import org.apache.gearpump.util.ActorUtil
+
+class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
+ implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
+ val mockMaster = TestProbe()(system)
+ val mockWorker1 = TestProbe()(system)
+ val client = TestProbe()(system)
+ val masterProxy = mockMaster.ref
+ val appId = 0
+ val userName = "test"
+ val masterExecutorId = 0
+ val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
+ val resource = Resource(1)
+ val appJar = None
+ val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName,
+ UserConfig.empty)
+
+ "DistService AppMaster" should {
+ "responsable for service distributing" in {
+ val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref)
+ val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy,
+ appMasterInfo)
+ TestActorRef[DistServiceAppMaster](
+ AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
+ val registerAppMaster = mockMaster.receiveOne(15.seconds)
+ assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
+
+ val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
+ mockMaster.reply(AppMasterRegistered(appId))
+ // The DistributedShell AppMaster will ask for worker list
+ mockMaster.expectMsg(GetAllWorkers)
+ mockMaster.reply(WorkerList(workerList))
+ // After worker list is ready, DistributedShell AppMaster will request resouce on each worker
+ workerList.foreach { workerId =>
+ mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
+ relaxation = Relaxation.SPECIFICWORKER)))
+ }
+ mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref,
+ WorkerId(1, 0L)))))
+ mockWorker1.expectMsgClass(classOf[LaunchExecutor])
+ mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
+
+ appMaster.tell(GetFileContainer, client.ref)
+ client.expectMsgClass(15.seconds, classOf[FileContainer])
+ }
+ }
+
+ after {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/README.md
----------------------------------------------------------------------
diff --git a/examples/pagerank/README.md b/examples/pagerank/README.md
index 20d9032..7fe9220 100644
--- a/examples/pagerank/README.md
+++ b/examples/pagerank/README.md
@@ -2,7 +2,7 @@
After compile,
```scala
-bin\gear io.gearpump.experiments.pagerank.example.PageRankExample
+bin\gear org.apache.gearpump.experiments.pagerank.example.PageRankExample
```
### Syntax
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/resources/geardefault.conf b/examples/pagerank/src/main/resources/geardefault.conf
index d2a4e16..67275f9 100644
--- a/examples/pagerank/src/main/resources/geardefault.conf
+++ b/examples/pagerank/src/main/resources/geardefault.conf
@@ -1,7 +1,7 @@
gearpump {
serializers {
- "io.gearpump.experiments.pagerank.PageRankController$Tick" = ""
- "io.gearpump.experiments.pagerank.PageRankWorker$UpdateWeight" = ""
- "io.gearpump.experiments.pagerank.PageRankWorker$LatestWeight" = ""
+ "org.apache.gearpump.experiments.pagerank.PageRankController$Tick" = ""
+ "org.apache.gearpump.experiments.pagerank.PageRankWorker$UpdateWeight" = ""
+ "org.apache.gearpump.experiments.pagerank.PageRankWorker$LatestWeight" = ""
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
deleted file mode 100644
index 2e37091..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank
-
-import akka.actor.ActorSystem
-
-import io.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
-import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph.Node
-
-/**
- *
- * A simple and naive pagerank implementation.
- *
- * @param name name of the application
- * @param iteration max iteration count
- * @param delta decide the accuracy when the page rank example stops.
- * @param dag the page rank graph
- */
-class PageRankApplication[T](
- override val name: String, iteration: Int, delta: Double, dag: Graph[T, _])
- extends Application {
-
- override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
- override def userConfig(implicit system: ActorSystem): UserConfig = {
-
- // Map node with taskId
- var taskId = 0
- val pageRankDag = dag.mapVertex { node =>
- val updatedNode = NodeWithTaskId(taskId, node)
- taskId += 1
- updatedNode
- }
-
- val taskCount = taskId
-
- val userConfig = UserConfig.empty.withValue(PageRankApplication.DAG, pageRankDag).
- withInt(PageRankApplication.ITERATION, iteration).
- withInt(PageRankApplication.COUNT, taskCount).
- withDouble(PageRankApplication.DELTA, delta)
-
- val controller = Processor[PageRankController](1)
- val pageRankWorker = Processor[PageRankWorker](taskCount)
- val partitioner = new HashPartitioner
-
- val app = StreamApplication(name, Graph(controller ~ partitioner ~> pageRankWorker), userConfig)
- app.userConfig
- }
-}
-
-object PageRankApplication {
- val DAG = "PageRank.DAG"
- val ITERATION = "PageRank.Iteration"
- val COUNT = "PageRank.COUNT"
- val DELTA = "PageRank.DELTA"
- val REPORTER = "PageRank.Reporter"
- case class NodeWithTaskId[T](taskId: Int, node: T)
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
deleted file mode 100644
index 0fb689d..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank
-
-import akka.actor.Actor.Receive
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.pagerank.PageRankController.Tick
-import io.gearpump.experiments.pagerank.PageRankWorker.LatestWeight
-import io.gearpump.streaming.task._
-
-class PageRankController(taskContext: TaskContext, conf: UserConfig)
- extends Task(taskContext, conf) {
-
- val taskCount = conf.getInt(PageRankApplication.COUNT).get
- val iterationMax = conf.getInt(PageRankApplication.ITERATION).get
- val delta = conf.getDouble(PageRankApplication.DELTA).get
-
- val tasks = (0 until taskCount).toList.map(TaskId(1, _))
-
- var tick: Int = 0
- var receivedWeightForCurrentTick = 0
-
- var weights = Map.empty[TaskId, Double]
- var deltas = Map.empty[TaskId, Double]
-
- override def onStart(startTime: StartTime): Unit = {
- output(Tick(tick), tasks: _*)
- }
-
- private def output(msg: AnyRef, tasks: TaskId*): Unit = {
- taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
- }
-
- override def receiveUnManagedMessage: Receive = {
- case LatestWeight(taskId, weight, replyTick) =>
- if (this.tick == replyTick) {
-
- deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
- weights += taskId -> weight
- receivedWeightForCurrentTick += 1
- if (receivedWeightForCurrentTick == taskCount) {
- this.tick += 1
- receivedWeightForCurrentTick = 0
- if (continueIteration) {
- LOG.debug(s"next iteration: $tick, weight: $weights, delta: $deltas")
- output(Tick(tick), tasks: _*)
- } else {
- LOG.info(s"iterations: $tick, weight: $weights, delta: $deltas")
- }
- }
- }
- }
-
- private def continueIteration: Boolean = {
- (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) =>
- deltaExceed || value > delta
- }
- }
-}
-
-object PageRankController {
- case class Tick(iteration: Int)
-}