You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/14 11:55:22 UTC
[1/4] incubator-gearpump git commit: [GEARPUMP-224] merge
gearpump-daemon to gearpump-core
Repository: incubator-gearpump
Updated Branches:
refs/heads/master a01809b25 -> c3d5eb63f
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
deleted file mode 100644
index 325a484..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.concurrent.duration._
-
-import akka.actor.Props
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.master.InMemoryKVService._
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-
-class InMemoryKVServiceSpec
- extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-
- override def beforeEach(): Unit = {
- startActorSystem()
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- override def config: Config = TestUtil.MASTER_CONFIG
-
- "KVService" should "get, put, delete correctly" in {
- val system = getActorSystem
- val kvService = system.actorOf(Props(new InMemoryKVService()))
- val group = "group"
-
- val client = TestProbe()(system)
-
- client.send(kvService, PutKV(group, "key", 1))
- client.expectMsg(PutKVSuccess)
-
- client.send(kvService, PutKV(group, "key", 2))
- client.expectMsg(PutKVSuccess)
-
- client.send(kvService, GetKV(group, "key"))
- client.expectMsg(GetKVSuccess("key", 2))
-
- client.send(kvService, DeleteKVGroup(group))
-
- // After DeleteGroup, it no longer accept Get and Put message for this group.
- client.send(kvService, GetKV(group, "key"))
- client.expectNoMsg(3.seconds)
-
- client.send(kvService, PutKV(group, "key", 3))
- client.expectNoMsg(3.seconds)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
deleted file mode 100644
index e82dff3..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
-import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-
-class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
- with WordSpecLike with Matchers with BeforeAndAfterAll{
-
- def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
- val appId = 0
- val workerId1: WorkerId = WorkerId(1, 0L)
- val workerId2: WorkerId = WorkerId(2, 0L)
- val mockAppMaster = TestProbe()
- val mockWorker1 = TestProbe()
- val mockWorker2 = TestProbe()
-
- override def afterAll {
- TestKit.shutdownActorSystem(system)
- }
-
- "The scheduler" should {
- "update resource only when the worker is registered" in {
- val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
- expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
- s"registered into master"))
- }
-
- "drop application's resource requests when the application is removed" in {
- val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
- val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
- scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
- scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
- scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
- scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
- mockAppMaster.expectNoMsg(5.seconds)
- }
- }
-
- def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
- left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
- }
-
- "The resource request with higher priority" should {
- "be handled first" in {
- val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
- val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
- val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
-
- scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
- scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
- scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
- scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
- scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-
- var expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
-
- expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
-
- expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
-
- scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-
- expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
- }
- }
-
- "The resource request which delivered earlier" should {
- "be handled first if the priorities are the same" in {
- val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
- val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
- scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
- scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
- scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-
- var expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
- expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
- }
- }
-
- "The PriorityScheduler" should {
- "handle the resource request with different relaxation" in {
- val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
- val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
-
- scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
- scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
- scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-
- var expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
-
- scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
- scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-
- expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
-
- val request3 = ResourceRequest(
- Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
- scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-
- expect = ResourceAllocated(Array(
- ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
- ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
-
- // We have to manually update the resource on each worker
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
- val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
- scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
-
- expect = ResourceAllocated(
- Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
- mockAppMaster.expectMsgPF(5.seconds) {
- case request: ResourceAllocated if sameElement(request, expect) => Unit
- }
- }
- }
-
- "The PriorityScheduler" should {
- "handle the resource request with different executor number" in {
- val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
- scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
- scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
- scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-
- // By default, the request requires only one executor
- val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
- scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
- val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
- assert(allocations2.allocations.length == 1)
- assert(allocations2.allocations.head.resource == Resource(20))
-
- val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
- scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
- val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
- assert(allocations3.allocations.length == 3)
- assert(allocations3.allocations.forall(_.resource == Resource(8)))
-
- // The total available resource can not satisfy the requirements with executor number
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
- scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
- val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
- scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
- val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
- assert(allocations4.allocations.length == 2)
- assert(allocations4.allocations.forall(_.resource == Resource(20)))
-
- // When new resources are available, the remaining request will be satisfied
- scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
- val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
- assert(allocations5.allocations.length == 1)
- assert(allocations4.allocations.forall(_.resource == Resource(20)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
deleted file mode 100644
index bf25057..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.worker
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, PoisonPill, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest._
-
-import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
-import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
-import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
-import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
-import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
-
-class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- val appId = 1
- val workerId: WorkerId = WorkerId(1, 0L)
- val executorId = 1
- var masterProxy: TestProbe = null
- var mockMaster: TestProbe = null
- var client: TestProbe = null
- val workerSlots = 50
-
- override def beforeEach(): Unit = {
- startActorSystem()
- mockMaster = TestProbe()(getActorSystem)
- masterProxy = TestProbe()(getActorSystem)
- client = TestProbe()(getActorSystem)
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- "The new started worker" should {
- "kill itself if no response from Master after registering" in {
- val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
- mockMaster watch worker
- mockMaster.expectMsg(RegisterNewWorker)
- mockMaster.expectTerminated(worker, 60.seconds)
- }
- }
-
- "Worker" should {
- "init its resource from the gearpump config" in {
- val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
- withFallback(TestUtil.DEFAULT_CONFIG)
- val workerSystem = ActorSystem("WorkerSystem", config)
- val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
- mockMaster watch worker
- mockMaster.expectMsg(RegisterNewWorker)
-
- worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
- mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
-
- worker.tell(
- UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
- mockMaster.expectTerminated(worker, 5.seconds)
- workerSystem.terminate()
- Await.result(workerSystem.whenTerminated, Duration.Inf)
- }
- }
-
- "Worker" should {
- "update its remaining resource when launching and shutting down executors" in {
- val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
- masterProxy.expectMsg(RegisterNewWorker)
-
- worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
- mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
-
- val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
- // This is an actor path which the ActorSystemBooter will report back to,
- // not needed in this test
- val reportBack = "dummy"
- val executionContext = ExecutorJVMConfig(Array.empty[String],
- getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
- classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
- username = "user")
-
- // Test LaunchExecutor
- worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
- mockMaster.ref)
- mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
-
- worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
- mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
-
- worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
- mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
-
- // Test terminationWatch
- worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
- mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
- client.expectMsg(ShutdownExecutorSucceed(1, 1))
-
- worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
- client.expectMsg(ShutdownExecutorFailed(
- s"Can not find executor ${executorId + 1} for app $appId"))
-
- mockMaster.ref ! PoisonPill
- masterProxy.expectMsg(RegisterWorker(workerId))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
index 84dec70..2988f5b 100644
--- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -20,7 +20,6 @@ package org.apache.gearpump.redis
import java.nio.charset.Charset
object RedisMessage {
-
private def toBytes(strings: List[String]): List[Array[Byte]] =
strings.map(string => string.getBytes(Charset.forName("UTF8")))
@@ -48,11 +47,10 @@ object RedisMessage {
* @param latitude
* @param member
*/
- case class GEOADD(key: Array[Byte], longitude: Double,
- latitude: Double, member: Array[Byte]) {
- def this(key: String, longitude: Double,
- latitude: Double, member: String) =
+ case class GEOADD(key: Array[Byte], longitude: Double, latitude: Double, member: Array[Byte]) {
+ def this(key: String, longitude: Double, latitude: Double, member: String) = {
this(toBytes(key), longitude, latitude, toBytes(member))
+ }
}
}
@@ -66,7 +64,9 @@ object RedisMessage {
* @param field
*/
case class HDEL(key: Array[Byte], field: Array[Byte]) {
- def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+ def this(key: String, field: String) = {
+ this(toBytes(key), toBytes(field))
+ }
}
/**
@@ -77,8 +77,9 @@ object RedisMessage {
* @param increment
*/
case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
- def this(key: String, field: String, increment: Long) =
+ def this(key: String, field: String, increment: Long) = {
this(toBytes(key), toBytes(field), increment)
+ }
}
/**
@@ -89,8 +90,9 @@ object RedisMessage {
* @param increment
*/
case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) {
- def this(key: String, field: String, increment: Float) =
+ def this(key: String, field: String, increment: Float) = {
this(toBytes(key), toBytes(field), increment)
+ }
}
@@ -102,8 +104,9 @@ object RedisMessage {
* @param value
*/
case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
- def this(key: String, field: String, value: String) =
+ def this(key: String, field: String, value: String) = {
this(toBytes(key), toBytes(field), toBytes(value))
+ }
}
/**
@@ -114,8 +117,9 @@ object RedisMessage {
* @param value
*/
case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
- def this(key: String, field: String, value: String) =
+ def this(key: String, field: String, value: String) = {
this(toBytes(key), toBytes(field), toBytes(value))
+ }
}
}
@@ -142,8 +146,9 @@ object RedisMessage {
* @param value
*/
case class LPUSH(key: Array[Byte], value: Array[Byte]) {
-
- def this(key: String, value: String) = this(key, toBytes(value))
+ def this(key: String, value: String) = {
+ this(key, toBytes(value))
+ }
}
/**
@@ -153,7 +158,9 @@ object RedisMessage {
* @param value
*/
case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
- def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ def this(key: String, value: String) = {
+ this(toBytes(key), toBytes(value))
+ }
}
/**
@@ -164,7 +171,9 @@ object RedisMessage {
* @param value
*/
case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
- def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value))
+ def this(key: String, index: Long, value: String) = {
+ this(toBytes(key), index, toBytes(value))
+ }
}
/**
@@ -174,8 +183,9 @@ object RedisMessage {
* @param value
*/
case class RPUSH(key: Array[Byte], value: Array[Byte]) {
-
- def this(key: String, value: String) = this(key, toBytes(value))
+ def this(key: String, value: String) = {
+ this(key, toBytes(value))
+ }
}
/**
@@ -185,7 +195,9 @@ object RedisMessage {
* @param value
*/
case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
- def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ def this(key: String, value: String) = {
+ this(toBytes(key), toBytes(value))
+ }
}
}
@@ -198,8 +210,9 @@ object RedisMessage {
* @param message
*/
case class DEL(message: Array[Byte]) {
-
- def this(message: String) = this(toBytes(message))
+ def this(message: String) = {
+ this(toBytes(message))
+ }
}
/**
@@ -208,7 +221,9 @@ object RedisMessage {
* @param key
*/
case class EXPIRE(key: Array[Byte], seconds: Int) {
- def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+ def this(key: String, seconds: Int) = {
+ this(toBytes(key), seconds)
+ }
}
/**
@@ -218,7 +233,9 @@ object RedisMessage {
* @param timestamp
*/
case class EXPIREAT(key: Array[Byte], timestamp: Long) {
- def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+ def this(key: String, timestamp: Long) = {
+ this(toBytes(key), timestamp)
+ }
}
/**
@@ -230,9 +247,11 @@ object RedisMessage {
* @param database
* @param timeout
*/
- case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) {
- def this(host: String, port: Int, key: String, database: Int, timeout: Int) =
+ case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte],
+ database: Int, timeout: Int) {
+ def this(host: String, port: Int, key: String, database: Int, timeout: Int) = {
this(toBytes(host), port, toBytes(key), database, timeout)
+ }
}
/**
@@ -242,7 +261,9 @@ object RedisMessage {
* @param db
*/
case class MOVE(key: Array[Byte], db: Int) {
- def this(key: String, db: Int) = this(toBytes(key), db)
+ def this(key: String, db: Int) = {
+ this(toBytes(key), db)
+ }
}
/**
@@ -251,7 +272,9 @@ object RedisMessage {
* @param key
*/
case class PERSIST(key: Array[Byte]) {
- def this(key: String) = this(toBytes(key))
+ def this(key: String) = {
+ this(toBytes(key))
+ }
}
/**
@@ -261,7 +284,9 @@ object RedisMessage {
* @param milliseconds
*/
case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
- def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+ def this(key: String, milliseconds: Long) = {
+ this(toBytes(key), milliseconds)
+ }
}
/**
@@ -271,7 +296,9 @@ object RedisMessage {
* @param timestamp
*/
case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
- def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+ def this(key: String, milliseconds: Long) = {
+ this(toBytes(key), milliseconds)
+ }
}
/**
@@ -281,7 +308,9 @@ object RedisMessage {
* @param newKey
*/
case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
- def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+ def this(key: String, newKey: String) = {
+ this(toBytes(key), toBytes(newKey))
+ }
}
/**
@@ -291,7 +320,9 @@ object RedisMessage {
* @param newKey
*/
case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
- def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+ def this(key: String, newKey: String) = {
+ this(toBytes(key), toBytes(newKey))
+ }
}
}
@@ -306,8 +337,9 @@ object RedisMessage {
* @param members
*/
case class SADD(key: Array[Byte], members: Array[Byte]) {
-
- def this(key: String, members: String) = this(key, toBytes(members))
+ def this(key: String, members: String) = {
+ this(key, toBytes(members))
+ }
}
@@ -319,8 +351,9 @@ object RedisMessage {
* @param member
*/
case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) {
- def this(source: String, destination: String, member: String) =
+ def this(source: String, destination: String, member: String) = {
this(toBytes(source), toBytes(destination), toBytes(member))
+ }
}
@@ -331,8 +364,9 @@ object RedisMessage {
* @param member
*/
case class SREM(key: Array[Byte], member: Array[Byte]) {
-
- def this(key: String, member: String) = this(key, toBytes(member))
+ def this(key: String, member: String) = {
+ this(key, toBytes(member))
+ }
}
}
@@ -346,7 +380,9 @@ object RedisMessage {
* @param value
*/
case class APPEND(key: Array[Byte], value: Array[Byte]) {
- def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ def this(key: String, value: String) = {
+ this(toBytes(key), toBytes(value))
+ }
}
/**
@@ -355,7 +391,9 @@ object RedisMessage {
* @param key
*/
case class DECR(key: Array[Byte]) {
- def this(key: String) = this(toBytes(key))
+ def this(key: String) = {
+ this(toBytes(key))
+ }
}
/**
@@ -365,7 +403,9 @@ object RedisMessage {
* @param decrement
*/
case class DECRBY(key: Array[Byte], decrement: Int) {
- def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+ def this(key: String, decrement: Int) = {
+ this(toBytes(key), decrement)
+ }
}
/**
@@ -374,7 +414,9 @@ object RedisMessage {
* @param key
*/
case class INCR(key: Array[Byte]) {
- def this(key: String) = this(toBytes(key))
+ def this(key: String) = {
+ this(toBytes(key))
+ }
}
/**
@@ -384,7 +426,9 @@ object RedisMessage {
* @param increment
*/
case class INCRBY(key: Array[Byte], increment: Int) {
- def this(key: String, increment: Int) = this(toBytes(key), increment)
+ def this(key: String, increment: Int) = {
+ this(toBytes(key), increment)
+ }
}
/**
@@ -394,7 +438,9 @@ object RedisMessage {
* @param increment
*/
case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
- def this(key: String, increment: Number) = this(toBytes(key), increment)
+ def this(key: String, increment: Number) = {
+ this(toBytes(key), increment)
+ }
}
@@ -405,7 +451,9 @@ object RedisMessage {
* @param value
*/
case class SET(key: Array[Byte], value: Array[Byte]) {
- def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ def this(key: String, value: String) = {
+ this(toBytes(key), toBytes(value))
+ }
}
/**
@@ -416,7 +464,9 @@ object RedisMessage {
* @param value
*/
case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
- def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value))
+ def this(key: String, offset: Long, value: String) = {
+ this(toBytes(key), offset, toBytes(value))
+ }
}
/**
@@ -427,7 +477,9 @@ object RedisMessage {
* @param value
*/
case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
- def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value))
+ def this(key: String, seconds: Int, value: String) = {
+ this(toBytes(key), seconds, toBytes(value))
+ }
}
/**
@@ -437,7 +489,9 @@ object RedisMessage {
* @param value
*/
case class SETNX(key: Array[Byte], value: Array[Byte]) {
- def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ def this(key: String, value: String) = {
+ this(toBytes(key), toBytes(value))
+ }
}
/**
@@ -448,9 +502,9 @@ object RedisMessage {
* @param value
*/
case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
- def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value))
+ def this(key: String, offset: Int, value: String) = {
+ this(toBytes(key), offset, toBytes(value))
+ }
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
index 3f75949..36a9fe3 100644
--- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -32,20 +32,20 @@ import redis.clients.jedis.Jedis
import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}
/**
- * Save message in Redis Instance
- *
- * @param host
- * @param port
- * @param timeout
- * @param database
- * @param password
- */
+ * Save message in Redis Instance
+ *
+ * @param host
+ * @param port
+ * @param timeout
+ * @param database
+ * @param password
+ */
class RedisSink(
- host: String = DEFAULT_HOST,
- port: Int = DEFAULT_PORT,
- timeout: Int = DEFAULT_TIMEOUT,
- database: Int = DEFAULT_DATABASE,
- password: String = "") extends DataSink {
+ host: String = DEFAULT_HOST,
+ port: Int = DEFAULT_PORT,
+ timeout: Int = DEFAULT_TIMEOUT,
+ database: Int = DEFAULT_DATABASE,
+ password: String = "") extends DataSink {
private val LOG = LogUtil.getLogger(getClass)
@transient private lazy val client = new Jedis(host, port, timeout)
@@ -59,7 +59,6 @@ class RedisSink(
}
override def write(message: Message): Unit = {
-
message.msg match {
// GEO
case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 4552a64..40b5743 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -140,15 +140,6 @@ object Build extends sbt.Build {
publishArtifact in Test := false
)
- val daemonDependencies = Seq(
- libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
- "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
- "commons-logging" % "commons-logging" % commonsLoggingVersion,
- "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
- )
- )
-
val coreDependencies = Seq(
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % slf4jVersion,
@@ -171,6 +162,10 @@ object Build extends sbt.Build {
"com.typesafe.akka" %% "akka-remote" % akkaVersion
exclude("io.netty", "netty"),
+ "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
+ "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
+ "commons-logging" % "commons-logging" % commonsLoggingVersion,
+ "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-agent" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
@@ -256,7 +251,7 @@ object Build extends sbt.Build {
id = "gearpump",
base = file("."),
settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting)
- .aggregate(shaded, core, daemon, streaming, services, external_kafka, external_monoid,
+ .aggregate(shaded, core, streaming, services, external_kafka, external_monoid,
external_serializer, examples, storm, yarn, external_hbase, gearpumpHadoop, packProject,
external_hadoopfs, integration_test).settings(Defaults.itSettings: _*)
.disablePlugins(sbtassembly.AssemblyPlugin)
@@ -271,20 +266,13 @@ object Build extends sbt.Build {
getShadedDepXML(organization.value, shaded_guava.id, version.value),
getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node)
}
- ))
- .disablePlugins(sbtassembly.AssemblyPlugin)
+ )).disablePlugins(sbtassembly.AssemblyPlugin)
- lazy val daemon = Project(
- id = "gearpump-daemon",
- base = file("daemon"),
- settings = commonSettings ++ daemonDependencies)
- .dependsOn(core % "test->test; compile->compile", cgroup % "test->test; compile->compile")
- .disablePlugins(sbtassembly.AssemblyPlugin)
lazy val cgroup = Project(
id = "gearpump-experimental-cgroup",
base = file("experiments/cgroup"),
- settings = commonSettings ++ noPublish ++ daemonDependencies)
+ settings = commonSettings ++ noPublish)
.dependsOn (core % "test->test; compile->compile")
.disablePlugins(sbtassembly.AssemblyPlugin)
@@ -301,7 +289,7 @@ object Build extends sbt.Build {
getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node)
}
))
- .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test")
+ .dependsOn(core % "test->test; compile->compile", shaded_gs_collections)
.disablePlugins(sbtassembly.AssemblyPlugin)
lazy val external_kafka = Project(
@@ -412,19 +400,18 @@ object Build extends sbt.Build {
),
mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test")
))
- .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+ .dependsOn(streaming % "test->test; provided")
lazy val redis = Project(
id = "gearpump-experiments-redis",
base = file("experiments/redis"),
- settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+ settings = commonSettings ++ noPublish ++
Seq(
libraryDependencies ++= Seq(
"redis.clients" % "jedis" % "2.9.0"
- ),
- mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
- ))
- .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+ )
+ )
+ ).dependsOn(streaming % "test->test; provided")
lazy val storm = Project(
id = "gearpump-experiments-storm",
@@ -489,7 +476,7 @@ object Build extends sbt.Build {
"org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided"
)
))
- .dependsOn(services % "test->test;compile->compile", daemon % "provided",
+ .dependsOn(services % "test->test;compile->compile",
core % "provided", gearpumpHadoop).disablePlugins(sbtassembly.AssemblyPlugin)
lazy val external_hbase = Project(
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/BuildExample.scala
----------------------------------------------------------------------
diff --git a/project/BuildExample.scala b/project/BuildExample.scala
index 75fc9be..fadc1ec 100644
--- a/project/BuildExample.scala
+++ b/project/BuildExample.scala
@@ -42,7 +42,7 @@ object BuildExample extends sbt.Build {
target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" /
CrossVersion.binaryScalaVersion(scalaVersion.value)
)
- ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+ ) dependsOn(streaming % "test->test; provided")
lazy val wordcount = Project(
id = "gearpump-examples-wordcount",
@@ -55,7 +55,7 @@ object BuildExample extends sbt.Build {
target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" /
CrossVersion.binaryScalaVersion(scalaVersion.value)
)
- ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+ ) dependsOn(streaming % "test->test; provided")
lazy val sol = Project(
id = "gearpump-examples-sol",
@@ -113,7 +113,7 @@ object BuildExample extends sbt.Build {
target in assembly := baseDirectory.value.getParentFile / "target" /
CrossVersion.binaryScalaVersion(scalaVersion.value)
)
- ) dependsOn (daemon % "test->test; provided")
+ ) dependsOn (core % "test->test; provided")
lazy val distributeservice = Project(
id = "gearpump-examples-distributeservice",
@@ -133,7 +133,7 @@ object BuildExample extends sbt.Build {
target in assembly := baseDirectory.value.getParentFile / "target" /
CrossVersion.binaryScalaVersion(scalaVersion.value)
)
- ) dependsOn (daemon % "test->test; provided")
+ ) dependsOn (core % "test->test; provided")
lazy val fsio = Project(
id = "gearpump-examples-fsio",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 13e53de..54b1d43 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -24,7 +24,6 @@ import xerial.sbt.Pack._
object Pack extends sbt.Build {
val daemonClassPath = Seq(
"${PROG_HOME}/conf",
- "${PROG_HOME}/lib/daemon/*",
// This is for DFSJarStore
"${PROG_HOME}/lib/yarn/*"
)
@@ -37,14 +36,12 @@ object Pack extends sbt.Build {
val serviceClassPath = Seq(
"${PROG_HOME}/conf",
- "${PROG_HOME}/lib/daemon/*",
"${PROG_HOME}/lib/services/*",
"${PROG_HOME}/dashboard"
)
val yarnClassPath = Seq(
"${PROG_HOME}/conf",
- "${PROG_HOME}/lib/daemon/*",
"${PROG_HOME}/lib/services/*",
"${PROG_HOME}/lib/yarn/*",
"${PROG_HOME}/conf/yarnconf",
@@ -112,11 +109,10 @@ object Pack extends sbt.Build {
"-Dgearpump.home=${PROG_HOME}")
),
packLibDir := Map(
- "lib" -> new ProjectsToPack(core.id, streaming.id),
- "lib/daemon" -> new ProjectsToPack(daemon.id, cgroup.id).exclude(core.id, streaming.id),
+ "lib" -> new ProjectsToPack(core.id, cgroup.id, streaming.id),
"lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id).
- exclude(services.id, daemon.id, core.id),
- "lib/services" -> new ProjectsToPack(services.id).exclude(daemon.id),
+ exclude(services.id, core.id),
+ "lib/services" -> new ProjectsToPack(services.id).exclude(core.id),
"lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id)
),
packExclude := Seq(thisProjectRef.value.project),
@@ -139,7 +135,7 @@ object Pack extends sbt.Build {
"gear" -> applicationClassPath,
"local" -> daemonClassPath,
"master" -> daemonClassPath,
- "worker" -> daemonClassPath,
+ "worker" -> applicationClassPath,
"services" -> serviceClassPath,
"yarnclient" -> yarnClassPath,
"storm" -> stormClassPath
@@ -149,6 +145,6 @@ object Pack extends sbt.Build {
packArchiveExcludes := Seq("integrationtest")
)
- ).dependsOn(core, streaming, services, yarn, storm).
+ ).dependsOn(core, streaming, services, yarn, storm, cgroup).
disablePlugins(sbtassembly.AssemblyPlugin)
}
[3/4] incubator-gearpump git commit: [GEARPUMP-224] merge
gearpump-daemon to gearpump-core
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
new file mode 100644
index 0000000..447b034
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.net.URL
+import java.util.concurrent.{Executors, TimeUnit}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceSucceed, UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
+import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
+import org.apache.gearpump.jarstore.JarStoreClient
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.util.ActorSystemBooter.Daemon
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util.{TimeOutScheduler, _}
+import org.slf4j.Logger
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Worker is used to track the resource on single machine, it is like
+ * the node manager of YARN.
+ *
+ * @param masterProxy masterProxy is used to resolve the master
+ */
+private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
+ private val systemConfig: Config = context.system.settings.config
+
+ private val address = ActorUtil.getFullPath(context.system, self.path)
+ private var resource = Resource.empty
+ private var allocatedResources = Map[ActorRef, Resource]()
+ private var executorsInfo = Map[ActorRef, ExecutorSlots]()
+ private var id: WorkerId = WorkerId.unspecified
+ private val createdTime = System.currentTimeMillis()
+ private var masterInfo: MasterInfo = null
+ private var executorNameToActor = Map.empty[String, ActorRef]
+ private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
+ private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
+
+ private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+ private val resourceUpdateTimeoutMs = 30000 // Milliseconds
+
+ private var totalSlots: Int = 0
+
+ val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+ var historyMetricsService: Option[ActorRef] = None
+
+ override def receive: Receive = null
+ var LOG: Logger = LogUtil.getLogger(getClass)
+
+ def service: Receive =
+ appMasterMsgHandler orElse
+ clientMessageHandler orElse
+ metricsService orElse
+ terminationWatch(masterInfo.master) orElse
+ ActorUtil.defaultMsgHandler(self)
+
+ def metricsService: Receive = {
+ case query: QueryHistoryMetrics =>
+ if (historyMetricsService.isEmpty) {
+ // Returns empty metrics so that we don't hang the UI
+ sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+ } else {
+ historyMetricsService.get forward query
+ }
+ }
+
+ private var metricsInitialized = false
+
+ val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+ private def initializeMetrics(): Unit = {
+ // Registers jvm metrics
+ val metricsSetName = "worker" + WorkerId.render(id)
+ Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
+
+ historyMetricsService = if (metricsEnabled) {
+ val historyMetricsService = {
+ context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
+ }
+
+ val metricsReportService = context.actorOf(Props(
+ new MetricsReporterService(Metrics(context.system))))
+ historyMetricsService.tell(ReportMetrics, metricsReportService)
+ Some(historyMetricsService)
+ } else {
+ None
+ }
+ }
+
+ def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
+
+ // If master get disconnected, the WorkerRegistered may be triggered multiple times.
+ case WorkerRegistered(id, masterInfo) =>
+ this.id = id
+
+ // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
+ // itself.
+ if (!metricsInitialized) {
+ initializeMetrics()
+ metricsInitialized = true
+ }
+
+ this.masterInfo = masterInfo
+ timeoutTicker.cancel()
+ context.watch(masterInfo.master)
+ this.LOG = LogUtil.getLogger(getClass, worker = id)
+ LOG.info(s"Worker is registered. " +
+ s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
+ sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
+ resourceUpdateTimeoutMs, updateResourceTimeOut())
+ context.become(service)
+ }
+
+ private def updateResourceTimeOut(): Unit = {
+ LOG.error(s"Update worker resource time out")
+ }
+
+ def appMasterMsgHandler: Receive = {
+ case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
+ val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ val executorToStop = executorNameToActor.get(actorName)
+ if (executorToStop.isDefined) {
+ LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
+ s"due to: $reason")
+ executorToStop.get.forward(shutdown)
+ } else {
+ LOG.error(s"Cannot find executor $actorName, ignore this message")
+ sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
+ }
+ case launch: LaunchExecutor =>
+ LOG.info(s"$launch")
+ if (resource < launch.resource) {
+ sender ! ExecutorLaunchRejected("There is no free resource on this machine")
+ } else {
+ val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
+
+ val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
+ jarStoreClient, executorProcLauncher))
+ executorNameToActor += actorName -> executor
+
+ resource = resource - launch.resource
+ allocatedResources = allocatedResources + (executor -> launch.resource)
+
+ reportResourceToMaster()
+ executorsInfo += executor ->
+ ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
+ context.watch(executor)
+ }
+ case UpdateResourceFailed(reason, ex) =>
+ LOG.error(reason)
+ context.stop(self)
+ case UpdateResourceSucceed =>
+ LOG.info(s"Update resource succeed")
+ case GetWorkerData(workerId) =>
+ val aliveFor = System.currentTimeMillis() - createdTime
+ val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+ val userDir = System.getProperty("user.dir")
+ sender ! WorkerData(WorkerSummary(
+ id, "active",
+ address,
+ aliveFor,
+ logDir,
+ executorsInfo.values.toArray,
+ totalSlots,
+ resource.slots,
+ userDir,
+ jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+ resourceManagerContainerId = systemConfig.getString(
+ GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
+ historyMetricsConfig = getHistoryMetricsConfig)
+ )
+ case ChangeExecutorResource(appId, executorId, usedResource) =>
+ for (executor <- executorActorRef(appId, executorId);
+ allocatedResource <- allocatedResources.get(executor)) {
+
+ allocatedResources += executor -> usedResource
+ resource = resource + allocatedResource - usedResource
+ reportResourceToMaster()
+
+ if (usedResource == Resource(0)) {
+ executorsInfo -= executor
+ allocatedResources -= executor
+ // stop executor if there is no resource binded to it.
+ LOG.info(s"Shutdown executor $executorId because the resource used is zero")
+ executor ! ShutdownExecutor(appId, executorId,
+ "Shutdown executor because the resource used is zero")
+ }
+ }
+ }
+
+ private def reportResourceToMaster(): Unit = {
+ sendMsgWithTimeOutCallBack(masterInfo.master,
+ ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+ }
+
+ private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
+ val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ executorNameToActor.get(actorName)
+ }
+
+ def clientMessageHandler: Receive = {
+ case QueryWorkerConfig(workerId) =>
+ if (this.id == workerId) {
+ sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+ } else {
+ sender ! WorkerConfig(ConfigFactory.empty)
+ }
+ }
+
+ private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
+ repeatActionUtil(
+ seconds = timeOutSeconds,
+ action = () => {
+ masterProxy ! RegisterWorker(workerId)
+ },
+ onTimeout = () => {
+ LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
+ s"seconds, abort and kill the worker...")
+ self ! PoisonPill
+ })
+ }
+
+ def terminationWatch(master: ActorRef): Receive = {
+ case Terminated(actor) =>
+ if (actor.compareTo(master) == 0) {
+ // Parent master is down, no point to keep worker anymore. Let's make suicide to free
+ // resources
+ LOG.info(s"Master cannot be contacted, find a new master ...")
+ context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
+ } else if (ActorUtil.isChildActorPath(self, actor)) {
+ // One executor is down,
+ LOG.info(s"Executor is down ${getExecutorName(actor)}")
+
+ val allocated = allocatedResources.get(actor)
+ if (allocated.isDefined) {
+ resource = resource + allocated.get
+ executorsInfo -= actor
+ allocatedResources = allocatedResources - actor
+ sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
+ resourceUpdateTimeoutMs, updateResourceTimeOut())
+ }
+ }
+ }
+
+ private def getExecutorName(actorRef: ActorRef): Option[String] = {
+ executorNameToActor.find(_._2 == actorRef).map(_._1)
+ }
+
+ private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
+ val launcherClazz = Class.forName(
+ systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
+ launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
+ .asInstanceOf[ExecutorProcessLauncher]
+ }
+
+ import context.dispatcher
+ override def preStart(): Unit = {
+ LOG.info(s"RegisterNewWorker")
+ totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
+ this.resource = Resource(totalSlots)
+ masterProxy ! RegisterNewWorker
+ context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
+ }
+
+ private def registerTimeoutTicker(seconds: Int): Cancellable = {
+ repeatActionUtil(seconds, () => Unit, () => {
+ LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
+ s"abort and kill the worker...")
+ self ! PoisonPill
+ })
+ }
+
+ private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
+ : Cancellable = {
+ val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
+ Duration(2, TimeUnit.SECONDS))(action())
+ val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
+ new Cancellable {
+ def cancel(): Boolean = {
+ val result1 = cancelTimeout.cancel()
+ val result2 = cancelSuicide.cancel()
+ result1 && result2
+ }
+
+ def isCancelled: Boolean = {
+ cancelTimeout.isCancelled && cancelSuicide.isCancelled
+ }
+ }
+ }
+
+ override def postStop(): Unit = {
+ LOG.info(s"Worker is going down....")
+ ioPool.shutdown()
+ context.system.terminate()
+ }
+}
+
+private[cluster] object Worker {
+
+ case class ExecutorResult(result: Try[Int])
+
+ class ExecutorWatcher(
+ launch: LaunchExecutor,
+ masterInfo: MasterInfo,
+ ioPool: ExecutionContext,
+ jarStoreClient: JarStoreClient,
+ procLauncher: ExecutorProcessLauncher) extends Actor {
+ import launch.{appId, executorId, resource}
+
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
+
+ val executorConfig: Config = {
+ val workerConfig = context.system.settings.config
+
+ val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
+ Option(jvmConfig.executorAkkaConfig)
+ }.getOrElse(ConfigFactory.empty())
+
+ resolveExecutorConfig(workerConfig, submissionConfig)
+ }
+
+ // For some config, worker has priority, for others, user Application submission config
+ // have priorities.
+ private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
+ val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
+ .withoutPath(GEARPUMP_CLUSTER_MASTERS)
+ .withoutPath(GEARPUMP_HOME)
+ .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
+ .withoutPath(GEARPUMP_LOG_APPLICATION_DIR)
+ .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
+ // Falls back to workerConfig
+ .withFallback(workerConfig)
+
+ // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
+ val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
+ val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
+ LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
+ config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
+ } else {
+ config
+ }
+
+ // Excludes reference.conf, and JVM properties..
+ ClusterConfig.filterOutDefaultConfig(updatedConf)
+ }
+
+ implicit val executorService = ioPool
+
+ private val executorHandler = {
+ val ctx = launch.executorJvmConfig
+
+ if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
+ new ExecutorHandler {
+ val exitPromise = Promise[Int]()
+ val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
+
+ override def destroy(): Unit = {
+ context.stop(app)
+ }
+ override def exitValue: Future[Int] = {
+ exitPromise.future
+ }
+ }
+ } else {
+ createProcess(ctx)
+ }
+ }
+
+ private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
+
+ val process = Future {
+ val jarPath = ctx.jar.map { appJar =>
+ val tempFile = File.createTempFile(appJar.name, ".jar")
+ jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
+ val file = new URL("file:" + tempFile)
+ file.getFile
+ }
+
+ val configFile = {
+ val configFile = File.createTempFile("gearpump", ".conf")
+ ClusterConfig.saveConfig(executorConfig, configFile)
+ val file = new URL("file:" + configFile)
+ file.getFile
+ }
+
+ val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
+ ctx.classPath.map(path => expandEnviroment(path)) ++
+ jarPath.map(Array(_)).getOrElse(Array.empty[String])
+
+ val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
+ val logArgs = List(
+ s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
+ s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
+ s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
+ s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
+ val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
+
+ val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
+
+ // Remote debug executor process
+ val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
+ val remoteDebugConfig = if (remoteDebugFlag) {
+ val availablePort = Util.findFreePort().get
+ List(
+ "-Xdebug",
+ s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
+ s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
+ )
+ } else {
+ List.empty[String]
+ }
+
+ val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
+ val verboseGCConfig = if (verboseGCFlag) {
+ List(
+ s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
+ "-verbose:gc",
+ "-XX:+PrintGCDetails",
+ "-XX:+PrintGCDateStamps",
+ "-XX:+PrintTenuringDistribution",
+ "-XX:+PrintGCApplicationConcurrentTime",
+ "-XX:+PrintGCApplicationStoppedTime"
+ )
+ } else {
+ List.empty[String]
+ }
+
+ val ipv4 = List(s"-D${PREFER_IPV4}=true")
+
+ val options = ctx.jvmArguments ++ username ++
+ logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
+
+ val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
+ options, classPath, ctx.mainClass, ctx.arguments)
+
+ ProcessInfo(process, jarPath, configFile)
+ }
+
+ new ExecutorHandler {
+
+ var destroyed = false
+
+ override def destroy(): Unit = {
+ LOG.info(s"Destroy executor process ${ctx.mainClass}")
+ if (!destroyed) {
+ destroyed = true
+ process.foreach { info =>
+ info.process.destroy()
+ info.jarPath.foreach(new File(_).delete())
+ new File(info.configFile).delete()
+ }
+ }
+ }
+
+ override def exitValue: Future[Int] = {
+ process.flatMap { info =>
+ val exit = info.process.exitValue()
+ if (exit == 0) {
+ Future.successful(0)
+ } else {
+ Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
+ s"error summary: ${info.process.logger.error}"))
+ }
+ }
+ }
+ }
+ }
+
+ private def expandEnviroment(path: String): String = {
+ // TODO: extend this to support more environment.
+ path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
+ }
+
+ override def preStart(): Unit = {
+ executorHandler.exitValue.onComplete { value =>
+ procLauncher.cleanProcess(appId, executorId)
+ val result = ExecutorResult(value)
+ self ! result
+ }
+ }
+
+ override def postStop(): Unit = {
+ executorHandler.destroy()
+ }
+
+ // The folders are under ${GEARPUMP_HOME}
+ val daemonPathPattern = List("lib" + File.separator + "yarn")
+
+ override def receive: Receive = {
+ case ShutdownExecutor(appId, executorId, reason: String) =>
+ executorHandler.destroy()
+ sender ! ShutdownExecutorSucceed(appId, executorId)
+ context.stop(self)
+ case ExecutorResult(executorResult) =>
+ executorResult match {
+ case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
+ case Failure(e) => LOG.error("Executor exit with errors", e)
+ }
+ context.stop(self)
+ }
+
+ private def getFormatedTime(timestamp: Long): String = {
+ val datePattern = "yyyy-MM-dd-HH-mm"
+ val format = new java.text.SimpleDateFormat(datePattern)
+ format.format(timestamp)
+ }
+
+ private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
+ classPath.filterNot(matchDaemonPattern(_))
+ }
+
+ private def matchDaemonPattern(path: String): Boolean = {
+ daemonPathPattern.exists(path.contains(_))
+ }
+ }
+
+ trait ExecutorHandler {
+ def destroy(): Unit
+ def exitValue: Future[Int]
+ }
+
+ case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
+
+ /**
+ * Starts the executor in the same JVM as worker.
+ */
+ class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
+ extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
+ private val exitCode = 0
+
+ override val supervisorStrategy =
+ OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
+ case ex: Throwable =>
+ LOG.error(s"system $name stopped ", ex)
+ exit.failure(ex)
+ Stop
+ }
+
+ override def postStop(): Unit = {
+ if (!exit.isCompleted) {
+ exit.success(exitCode)
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
new file mode 100644
index 0000000..0a22245
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.TestActorRef
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.master.Master
+import org.apache.gearpump.cluster.worker.Worker
+import org.apache.gearpump.util.Constants
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+class MiniCluster {
+ private val mockMasterIP = "127.0.0.1"
+
+ implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
+ withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
+
+ val (mockMaster, worker) = {
+ val master = system.actorOf(Props(classOf[Master]), "master")
+ val worker = system.actorOf(Props(classOf[Worker], master), "worker")
+
+ // Wait until worker register itself to master
+ waitUtilWorkerIsRegistered(master)
+ (master, worker)
+ }
+
+ def launchActor(props: Props): TestActorRef[Actor] = {
+ TestActorRef(props)
+ }
+
+ private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
+ while (!isWorkerRegistered(master)) {}
+ }
+
+ private def isWorkerRegistered(master: ActorRef): Boolean = {
+ import scala.concurrent.duration._
+ implicit val dispatcher = system.dispatcher
+
+ implicit val futureTimeout = Constants.FUTURE_TIMEOUT
+
+ val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
+
+ // Waits until the worker is registered.
+ val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
+ workers.workers.size > 0
+ }
+
+ def shutDown(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
new file mode 100644
index 0000000..f9b0762
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.{TestUtil, _}
+import org.apache.gearpump.util.LogUtil
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.util.Success
+
+class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+ var kvService: TestProbe = null
+ var haService: TestProbe = null
+ var appLauncher: TestProbe = null
+ var appManager: ActorRef = null
+ private val LOG = LogUtil.getLogger(getClass)
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ kvService = TestProbe()(getActorSystem)
+ appLauncher = TestProbe()(getActorSystem)
+
+ appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+ new DummyAppMasterLauncherFactory(appLauncher))))
+ kvService.expectMsgType[GetKV]
+ kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "AppManager" should "handle AppMaster message correctly" in {
+ val appMaster = TestProbe()(getActorSystem)
+ val appId = 1
+
+ val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
+ appMaster.send(appManager, register)
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ appMaster.send(appManager, ActivateAppMaster(appId))
+ appMaster.expectMsgType[AppMasterActivated]
+ }
+
+ "DataStoreService" should "support Put and Get" in {
+ val appMaster = TestProbe()(getActorSystem)
+ appMaster.send(appManager, SaveAppData(0, "key", 1))
+ kvService.expectMsgType[PutKV]
+ kvService.reply(PutKVSuccess)
+ appMaster.expectMsg(AppDataSaved)
+
+ appMaster.send(appManager, GetAppData(0, "key"))
+ kvService.expectMsgType[GetKV]
+ kvService.reply(GetKVSuccess("key", 1))
+ appMaster.expectMsg(GetAppDataResult("key", 1))
+ }
+
+ "AppManager" should "support application submission and shutdown" in {
+ testClientSubmission(withRecover = false)
+ }
+
+ "AppManager" should "support application submission and recover if appmaster dies" in {
+ LOG.info("=================testing recover==============")
+ testClientSubmission(withRecover = true)
+ }
+
+ "AppManager" should "handle client message correctly" in {
+ val mockClient = TestProbe()(getActorSystem)
+ mockClient.send(appManager, ShutdownApplication(1))
+ assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
+
+ mockClient.send(appManager, ResolveAppId(1))
+ assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
+
+ mockClient.send(appManager, AppMasterDataRequest(1))
+ mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+ }
+
+ "AppManager" should "reject the application submission if the app name already existed" in {
+ val app = TestUtil.dummyApp
+ val submit = SubmitApplication(app, None, "username")
+ val client = TestProbe()(getActorSystem)
+ val appMaster = TestProbe()(getActorSystem)
+ val worker = TestProbe()(getActorSystem)
+ val appId = 1
+
+ client.send(appManager, submit)
+
+ kvService.expectMsgType[PutKV]
+ appLauncher.expectMsg(LauncherStarted(appId))
+ appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+ AppMasterRuntimeInfo(appId, app.name)))
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ client.send(appManager, submit)
+ assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+ }
+
+ def testClientSubmission(withRecover: Boolean): Unit = {
+ val app = TestUtil.dummyApp
+ val submit = SubmitApplication(app, None, "username")
+ val client = TestProbe()(getActorSystem)
+ val appMaster = TestProbe()(getActorSystem)
+ val worker = TestProbe()(getActorSystem)
+ val appId = 1
+
+ client.send(appManager, submit)
+
+ kvService.expectMsgType[PutKV]
+ appLauncher.expectMsg(LauncherStarted(appId))
+ appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+ AppMasterRuntimeInfo(appId, app.name)))
+ kvService.expectMsgType[PutKV]
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ client.send(appManager, ResolveAppId(appId))
+ client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
+
+ client.send(appManager, AppMastersDataRequest)
+ client.expectMsgType[AppMastersData]
+
+ client.send(appManager, AppMasterDataRequest(appId, false))
+ client.expectMsgType[AppMasterData]
+
+ if (!withRecover) {
+ client.send(appManager, ShutdownApplication(appId))
+ client.expectMsg(ShutdownApplicationResult(Success(appId)))
+ } else {
+ // Do recovery
+ getActorSystem.stop(appMaster.ref)
+ kvService.expectMsgType[GetKV]
+ val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
+ kvService.reply(GetKVSuccess(APP_STATE, appState))
+ appLauncher.expectMsg(LauncherStarted(appId))
+ }
+ }
+}
+
+class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
+ override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+ Props(new DummyAppMasterLauncher(test, appId))
+ }
+}
+
+class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
+ test.ref ! LauncherStarted(appId)
+
+ override def receive: Receive = {
+ case any: Any => test.ref forward any
+ }
+}
+
+case class LauncherStarted(appId: Int)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
new file mode 100644
index 0000000..d3e739f
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.Props
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.InMemoryKVService
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+class InMemoryKVServiceSpec
+ extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ override def config: Config = TestUtil.MASTER_CONFIG
+
+ "KVService" should "get, put, delete correctly" in {
+ val system = getActorSystem
+ val kvService = system.actorOf(Props(new InMemoryKVService()))
+ val group = "group"
+
+ val client = TestProbe()(system)
+
+ client.send(kvService, PutKV(group, "key", 1))
+ client.expectMsg(PutKVSuccess)
+
+ client.send(kvService, PutKV(group, "key", 2))
+ client.expectMsg(PutKVSuccess)
+
+ client.send(kvService, GetKV(group, "key"))
+ client.expectMsg(GetKVSuccess("key", 2))
+
+ client.send(kvService, DeleteKVGroup(group))
+
+ // After DeleteGroup, it no longer accept Get and Put message for this group.
+ client.send(kvService, GetKV(group, "key"))
+ client.expectNoMsg(3.seconds)
+
+ client.send(kvService, PutKV(group, "key", 3))
+ client.expectNoMsg(3.seconds)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
new file mode 100644
index 0000000..2166976
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.Properties
+
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
+import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
+import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, LogUtil, Util}
+import org.scalatest._
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "Worker" should "register worker address to master when started." in {
+
+ val masterReceiver = createMockMaster()
+
+ val tempTestConf = convertTestConf(getHost, getPort)
+
+ val options = Array(
+ s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
+ s"-D${PREFER_IPV4}=true"
+ ) ++ getMasterListOption()
+
+ val worker = Util.startProcess(options,
+ getContextClassPath,
+ getMainClassName(Worker),
+ Array.empty)
+
+ try {
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
+
+ tempTestConf.delete()
+ } finally {
+ worker.destroy()
+ }
+ }
+
+ "Master" should "accept worker RegisterNewWorker when started" in {
+ val worker = TestProbe()(getActorSystem)
+
+ val host = "127.0.0.1"
+ val port = Util.findFreePort().get
+
+ val properties = new Properties()
+ properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
+ properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
+ val masterConfig = ConfigFactory.parseProperties(properties)
+ .withFallback(TestUtil.MASTER_CONFIG)
+ Future {
+ Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
+ }
+
+ val masterProxy = getActorSystem.actorOf(
+ MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+
+ worker.send(masterProxy, RegisterNewWorker)
+ worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+ }
+
+ "Info" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+ }
+
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
+ masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
+ }
+
+ "Kill" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ Kill.main(masterConfig, Array("-appid", "0"))
+ }
+
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
+ masterReceiver.reply(ShutdownApplicationResult(Success(0)))
+ }
+
+ "Replay" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ Replay.main(masterConfig, Array("-appid", "0"))
+ }
+
+ masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
+ masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
+ masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
+ masterReceiver.reply(ReplayApplicationResult(Success(0)))
+ }
+
+ "Local" should "be started without exception" in {
+ val port = Util.findFreePort().get
+ val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
+ s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
+ s"-D${PREFER_IPV4}=true")
+
+ val local = Util.startProcess(options,
+ getContextClassPath,
+ getMainClassName(Local),
+ Array.empty)
+
+ def retry(times: Int)(fn: => Boolean): Boolean = {
+
+ LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
+
+ val result = fn
+ if (result || times <= 0) {
+ result
+ } else {
+ Thread.sleep(1000)
+ retry(times - 1)(fn)
+ }
+ }
+
+ try {
+ assert(retry(10)(isPortUsed("127.0.0.1", port)),
+ "local is not started successfully, as port is not used " + port)
+ } finally {
+ local.destroy()
+ }
+ }
+
+ "Gear" should "support app|info|kill|shell|replay" in {
+
+ val commands = Array("app", "info", "kill", "shell", "replay")
+
+ assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
+
+ for (command <- commands) {
+ assert(Try(Gear.main(Array("-noexist"))).isFailure,
+ "pass unknown option, throw, command: " + command)
+ }
+
+ assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
+
+ val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
+ assert(tryThis.isFailure, "unknown command, throw")
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
new file mode 100644
index 0000000..b48fc2a
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.TestUtil
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class MasterWatcherSpec extends FlatSpec with Matchers {
+ def config: Config = TestUtil.MASTER_CONFIG
+
+ "MasterWatcher" should "kill itself when can not get a quorum" in {
+ val system = ActorSystem("ForMasterWatcher", config)
+
+ val actorWatcher = TestProbe()(system)
+
+ val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
+ actorWatcher watch masterWatcher
+ actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
new file mode 100644
index 0000000..8a3d7d1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+
+class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+ with WordSpecLike with Matchers with BeforeAndAfterAll{
+
+ def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
+ val appId = 0
+ val workerId1: WorkerId = WorkerId(1, 0L)
+ val workerId2: WorkerId = WorkerId(2, 0L)
+ val mockAppMaster = TestProbe()
+ val mockWorker1 = TestProbe()
+ val mockWorker2 = TestProbe()
+
+ override def afterAll {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ "The scheduler" should {
+ "update resource only when the worker is registered" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
+ expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
+ s"registered into master"))
+ }
+
+ "drop application's resource requests when the application is removed" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+ mockAppMaster.expectNoMsg(5.seconds)
+ }
+ }
+
+ def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
+ left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+ }
+
+ "The resource request with higher priority" should {
+ "be handled first" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
+ val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
+
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The resource request which delivered earlier" should {
+ "be handled first if the priorities are the same" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The PriorityScheduler" should {
+ "handle the resource request with different relaxation" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
+ val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
+
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ val request3 = ResourceRequest(
+ Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+
+ expect = ResourceAllocated(Array(
+ ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+ ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ // We have to manually update the resource on each worker
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
+ val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
+ scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The PriorityScheduler" should {
+ "handle the resource request with different executor number" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ // By default, the request requires only one executor
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations2.allocations.length == 1)
+ assert(allocations2.allocations.head.resource == Resource(20))
+
+ val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+ val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations3.allocations.length == 3)
+ assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+ // The total available resource can not satisfy the requirements with executor number
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+ val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
+ scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+ val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations4.allocations.length == 2)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+ // When new resources are available, the remaining request will be satisfied
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+ val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations5.allocations.length == 1)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
new file mode 100644
index 0000000..e0233f8
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import akka.actor.{ActorSystem, PoisonPill, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+import org.scalatest._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ val appId = 1
+ val workerId: WorkerId = WorkerId(1, 0L)
+ val executorId = 1
+ var masterProxy: TestProbe = null
+ var mockMaster: TestProbe = null
+ var client: TestProbe = null
+ val workerSlots = 50
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ mockMaster = TestProbe()(getActorSystem)
+ masterProxy = TestProbe()(getActorSystem)
+ client = TestProbe()(getActorSystem)
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "The new started worker" should {
+ "kill itself if no response from Master after registering" in {
+ val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+ mockMaster watch worker
+ mockMaster.expectMsg(RegisterNewWorker)
+ mockMaster.expectTerminated(worker, 60.seconds)
+ }
+ }
+
+ "Worker" should {
+ "init its resource from the gearpump config" in {
+ val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
+ withFallback(TestUtil.DEFAULT_CONFIG)
+ val workerSystem = ActorSystem("WorkerSystem", config)
+ val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+ mockMaster watch worker
+ mockMaster.expectMsg(RegisterNewWorker)
+
+ worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
+
+ worker.tell(
+ UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
+ mockMaster.expectTerminated(worker, 5.seconds)
+ workerSystem.terminate()
+ Await.result(workerSystem.whenTerminated, Duration.Inf)
+ }
+ }
+
+ "Worker" should {
+ "update its remaining resource when launching and shutting down executors" in {
+ val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
+ masterProxy.expectMsg(RegisterNewWorker)
+
+ worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+
+ val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ // This is an actor path which the ActorSystemBooter will report back to,
+ // not needed in this test
+ val reportBack = "dummy"
+ val executionContext = ExecutorJVMConfig(Array.empty[String],
+ getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
+ classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
+ username = "user")
+
+ // Test LaunchExecutor
+ worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
+ mockMaster.ref)
+ mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
+
+ worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
+
+ worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
+
+ // Test terminationWatch
+ worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+ client.expectMsg(ShutdownExecutorSucceed(1, 1))
+
+ worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
+ client.expectMsg(ShutdownExecutorFailed(
+ s"Can not find executor ${executorId + 1} for app $appId"))
+
+ mockMaster.ref ! PoisonPill
+ masterProxy.expectMsg(RegisterWorker(workerId))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
deleted file mode 100644
index 9e55be6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.worker.WorkerId
-
-/**
- * Cluster Bootup Flow
- */
-object WorkerToMaster {
-
- /** When an worker is started, it sends RegisterNewWorker */
- case object RegisterNewWorker
-
- /** When worker lose connection with master, it tries to register itself again with old Id. */
- case class RegisterWorker(workerId: WorkerId)
-
- /** Worker is responsible to broadcast its current status to master */
- case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
-}
-
-object MasterToWorker {
-
- /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
- case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
-
- /** Worker have not received reply from master */
- case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
-
- /** Master is synced with worker on resource slots managed by current worker */
- case object UpdateResourceSucceed
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
deleted file mode 100644
index 9bde4d1..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.embedded
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{Config, ConfigValueFactory}
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
-import org.apache.gearpump.util.{LogUtil, Util}
-
-/**
- * Create a in-process cluster with single worker
- */
-class EmbeddedCluster(inputConfig: Config) {
-
- private val workerCount: Int = 1
- private var _master: ActorRef = null
- private var _system: ActorSystem = null
- private var _config: Config = null
-
- private val LOG = LogUtil.getLogger(getClass)
-
- def start(): Unit = {
- val port = Util.findFreePort().get
- val akkaConf = getConfig(inputConfig, port)
- _config = akkaConf
- val system = ActorSystem(MASTER, akkaConf)
-
- val master = system.actorOf(Props[MasterActor], MASTER)
-
- 0.until(workerCount).foreach { id =>
- system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
- }
- this._master = master
- this._system = system
-
- LOG.info("=================================")
- LOG.info("Local Cluster is started at: ")
- LOG.info(s" 127.0.0.1:$port")
- LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
- }
-
- private def getConfig(inputConfig: Config, port: Int): Config = {
- val config = inputConfig.
- withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
- withValue(GEARPUMP_CLUSTER_MASTERS,
- ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
- withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
- ConfigValueFactory.fromAnyRef(true)).
- withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
- withValue("akka.actor.provider",
- ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
- config
- }
-
- def newClientContext: ClientContext = {
- ClientContext(_config, _system, _master)
- }
-
- def stop(): Unit = {
- _system.stop(_master)
- _system.terminate()
- Await.result(_system.whenTerminated, Duration.Inf)
- }
-}
-
-object EmbeddedCluster {
- def apply(): EmbeddedCluster = {
- new EmbeddedCluster(ClusterConfig.master())
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
deleted file mode 100644
index db71b7b..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
-
-object Local extends AkkaApp with ArgumentsParser {
- override def akkaConfig: Config = ClusterConfig.master()
-
- var LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] =
- Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
- "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
- defaultValue = Some(2)))
-
- override val description = "Start a local cluster"
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
-
- this.LOG = {
- LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
- LogUtil.getLogger(getClass)
- }
-
- val config = parse(args)
- if (null != config) {
- local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
- }
- }
-
- def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
- if (sameProcess) {
- LOG.info("Starting local in same process")
- System.setProperty("LOCAL", "true")
- }
- val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
- .asScala.flatMap(Util.parseHostList)
- val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
-
- if (masters.size != 1 && masters.head.host != local) {
- LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
- s"with ${Constants.GEARPUMP_HOSTNAME}")
- } else {
-
- val hostPort = masters.head
- implicit val system = ActorSystem(MASTER, akkaConf.
- withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
- )
-
- val master = system.actorOf(Props[MasterActor], MASTER)
- val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
-
- 0.until(workerCount).foreach { id =>
- system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
- }
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
deleted file mode 100644
index f1b9bdf..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.DistributedData
-import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
-import akka.cluster.{Cluster, Member, MemberStatus}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
-import org.apache.gearpump.cluster.master.Master.MasterListUpdated
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
-
-object Master extends AkkaApp with ArgumentsParser {
-
- private var LOG: Logger = LogUtil.getLogger(getClass)
-
- override def akkaConfig: Config = ClusterConfig.master()
-
- override val options: Array[(String, CLIOption[Any])] =
- Array("ip" -> CLIOption[String]("<master ip address>", required = true),
- "port" -> CLIOption("<master port>", required = true))
-
- override val description = "Start Master daemon"
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
-
- this.LOG = {
- LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
- LogUtil.getLogger(getClass)
- }
-
- val config = parse(args)
- master(config.getString("ip"), config.getInt("port"), akkaConf)
- }
-
- private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
- masters.exists { hostPort =>
- hostPort == s"$master:$port"
- }
- }
-
- private def master(ip: String, port: Int, akkaConf: Config): Unit = {
- val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
-
- if (!verifyMaster(ip, port, masters)) {
- LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
- s"gearpump.cluster.masters: ${masters.mkString(", ")}")
- System.exit(-1)
- }
-
- val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
- val quorum = masterList.size() / 2 + 1
- val masterConfig = akkaConf.
- withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
- withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
- withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
- withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
- ConfigValueFactory.fromAnyRef(quorum))
-
- LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
- val system = ActorSystem(MASTER, masterConfig)
-
- val replicator = DistributedData(system).replicator
- LOG.info(s"Replicator path: ${replicator.path}")
-
- // Starts singleton manager
- val singletonManager = system.actorOf(ClusterSingletonManager.props(
- singletonProps = Props(classOf[MasterWatcher], MASTER),
- terminationMessage = PoisonPill,
- settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
- .withRole(MASTER)),
- name = SINGLETON_MANAGER)
-
- // Start master proxy
- val masterProxy = system.actorOf(ClusterSingletonProxy.props(
- singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
- // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
- // Master is created when there is a majority of machines started.
- settings = ClusterSingletonProxySettings(system)
- .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
- name = MASTER
- )
-
- LOG.info(s"master proxy is started at ${masterProxy.path}")
-
- val mainThread = Thread.currentThread()
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run(): Unit = {
- if (!system.whenTerminated.isCompleted) {
- LOG.info("Triggering shutdown hook....")
-
- system.stop(masterProxy)
- val cluster = Cluster(system)
- cluster.leave(cluster.selfAddress)
- cluster.down(cluster.selfAddress)
- try {
- Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
- } catch {
- case ex: Exception => // Ignore
- }
- system.terminate()
- mainThread.join()
- }
- }
- })
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
-
-class MasterWatcher(role: String) extends Actor with ActorLogging {
- import context.dispatcher
-
- val cluster = Cluster(context.system)
-
- val config = context.system.settings.config
- val masters = config.getList("akka.cluster.seed-nodes")
- val quorum = masters.size() / 2 + 1
-
- val system = context.system
-
- // Sorts by age, oldest first
- val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
- var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
-
- def receive: Receive = null
-
- // Subscribes to MemberEvent, re-subscribe when restart
- override def preStart(): Unit = {
- cluster.subscribe(self, classOf[MemberEvent])
- context.become(waitForInit)
- }
- override def postStop(): Unit = {
- cluster.unsubscribe(self)
- }
-
- def matchingRole(member: Member): Boolean = member.hasRole(role)
-
- def waitForInit: Receive = {
- case state: CurrentClusterState => {
- membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
- m.status == MemberStatus.Up && matchingRole(m))
-
- if (membersByAge.size < quorum) {
- membersByAge.iterator.mkString(",")
- log.info(s"We cannot get a quorum, $quorum, " +
- s"shutting down...${membersByAge.iterator.mkString(",")}")
- context.become(waitForShutdown)
- self ! MasterWatcher.Shutdown
- } else {
- val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
- notifyMasterMembersChange(master)
- context.become(waitForClusterEvent(master))
- }
- }
- }
-
- def waitForClusterEvent(master: ActorRef): Receive = {
- case MemberUp(m) if matchingRole(m) => {
- membersByAge += m
- notifyMasterMembersChange(master)
- }
- case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
- mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
- log.info(s"member removed ${mEvent.member}")
- val m = mEvent.member
- membersByAge -= m
- if (membersByAge.size < quorum) {
- log.info(s"We cannot get a quorum, $quorum, " +
- s"shutting down...${membersByAge.iterator.mkString(",")}")
- context.become(waitForShutdown)
- self ! MasterWatcher.Shutdown
- } else {
- notifyMasterMembersChange(master)
- }
- }
- }
-
- private def notifyMasterMembersChange(master: ActorRef): Unit = {
- val masters = membersByAge.toList.map{ member =>
- MasterNode(member.address.host.getOrElse("Unknown-Host"),
- member.address.port.getOrElse(0))
- }
- master ! MasterListUpdated(masters)
- }
-
- def waitForShutdown: Receive = {
- case MasterWatcher.Shutdown => {
- cluster.unsubscribe(self)
- cluster.leave(cluster.selfAddress)
- context.stop(self)
- system.scheduler.scheduleOnce(Duration.Zero) {
- try {
- Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
- } catch {
- case ex: Exception => // Ignore
- }
- system.terminate()
- }
- }
- }
-}
-
-object MasterWatcher {
- object Shutdown
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
deleted file mode 100644
index 58a9dec..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to start a worker daemon process */
-object Worker extends AkkaApp with ArgumentsParser {
- protected override def akkaConfig = ClusterConfig.worker()
-
- override val description = "Start a worker daemon"
-
- var LOG: Logger = LogUtil.getLogger(getClass)
-
- private def uuid = java.util.UUID.randomUUID.toString
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
- val id = uuid
-
- this.LOG = {
- LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
- // Delay creation of LOG instance to avoid creating an empty log file as we
- // reset the log file name here
- LogUtil.getLogger(getClass)
- }
-
- val system = ActorSystem(id, akkaConf)
-
- val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
- val hostAndPort = address.split(":")
- HostPort(hostAndPort(0), hostAndPort(1).toInt)
- }
-
- LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
- val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
-
- system.actorOf(Props(classOf[WorkerActor], masterProxy),
- classOf[WorkerActor].getSimpleName + id)
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
\ No newline at end of file
[2/4] incubator-gearpump git commit: [GEARPUMP-224] merge
gearpump-daemon to gearpump-core
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
deleted file mode 100644
index 9a3a119..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.{Failure, Success}
-
-import akka.actor._
-import akka.pattern.ask
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
-import org.apache.gearpump.cluster.AppMasterToWorker._
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
-import org.apache.gearpump.cluster.MasterToClient._
-import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import org.apache.gearpump.cluster.master.AppManager._
-import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
-import org.apache.gearpump.cluster.master.Master._
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
-
-/**
- * AppManager is dedicated child of Master to manager all applications.
- */
-private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
- extends Actor with Stash with TimeOutScheduler {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
- private val appMasterMaxRetries: Int = 5
- private val appMasterRetryTimeRange: Duration = 20.seconds
-
- implicit val timeout = FUTURE_TIMEOUT
- implicit val executionContext = context.dispatcher
-
- // Next available appId
- private var nextAppId: Int = 1
-
- // From appId to appMaster data
- // Applications not in activeAppMasters or deadAppMasters are in pending status
- private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
-
- // Active appMaster list where applications are in active status
- private var activeAppMasters = Set.empty[Int]
-
- // Dead appMaster list where applications are in inactive status
- private var deadAppMasters = Set.empty[Int]
-
- private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
-
- def receive: Receive = null
-
- kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
- context.become(waitForMasterState)
-
- def waitForMasterState: Receive = {
- case GetKVSuccess(_, result) =>
- val masterState = result.asInstanceOf[MasterState]
- if (masterState != null) {
- this.nextAppId = masterState.maxId + 1
- this.activeAppMasters = masterState.activeAppMasters
- this.deadAppMasters = masterState.deadAppMasters
- this.appMasterRegistry = masterState.appMasterRegistry
- }
- context.become(receiveHandler)
- unstashAll()
- case GetKVFailed(ex) =>
- LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
- context.parent ! PoisonPill
- case msg =>
- LOG.info(s"Get message ${msg.getClass.getSimpleName}")
- stash()
- }
-
- def receiveHandler: Receive = {
- val msg = "Application Manager started. Ready for application submission..."
- LOG.info(msg)
- clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
- appDataStoreService orElse terminationWatch
- }
-
- def clientMsgHandler: Receive = {
- case SubmitApplication(app, jar, username) =>
- LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
- val client = sender()
- if (applicationNameExist(app.name)) {
- client ! SubmitApplicationResult(Failure(
- new Exception(s"Application name ${app.name} already existed")))
- } else {
- context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent,
- Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
-
- val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null)
- appMasterRestartPolicies += nextAppId ->
- new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
- kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
- nextAppId += 1
- }
-
- case RestartApplication(appId) =>
- val client = sender()
- (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
- case GetKVSuccess(_, result) =>
- val appState = result.asInstanceOf[ApplicationState]
- if (appState != null) {
- LOG.info(s"Shutting down the application (restart), $appId")
- self ! ShutdownApplication(appId)
- self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
- } else {
- client ! SubmitApplicationResult(Failure(
- new Exception(s"Failed to restart, because the application $appId does not exist.")
- ))
- }
- case GetKVFailed(ex) =>
- client ! SubmitApplicationResult(Failure(
- new Exception(s"Unable to obtain the Master State. " +
- s"Application $appId will not be restarted.")
- ))
- }
-
- case ShutdownApplication(appId) =>
- LOG.info(s"App Manager Shutting down application $appId")
- val (_, appInfo) = appMasterRegistry.get(appId)
- .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
- .getOrElse((null, null))
- Option(appInfo) match {
- case Some(info) =>
- val worker = info.worker
- val workerPath = Option(worker).map(_.path).orNull
- LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID")
- cleanApplicationData(appId)
- val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
- s"AppMaster $appId shutdown requested by master...")
- sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
- sender ! ShutdownApplicationResult(Success(appId))
- case None =>
- val errorMsg = s"Failed to find registration information for appId: $appId"
- LOG.error(errorMsg)
- sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
- }
-
- case ResolveAppId(appId) =>
- val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
- if (null != appMaster) {
- sender ! ResolveAppIdResult(Success(appMaster))
- } else {
- sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
- }
-
- case AppMastersDataRequest =>
- var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
- appMasterRegistry.foreach(pair => {
- val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
- val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
- val workerPath = Option(info.worker).map(worker =>
- ActorUtil.getFullPath(context.system, worker.path))
- val status = getAppMasterStatus(id)
- appMastersData += AppMasterData(
- status, id, info.appName, appMasterPath, workerPath.orNull,
- info.submissionTime, info.startTime, info.finishTime, info.user)
- })
-
- sender ! AppMastersData(appMastersData.toList)
-
- case QueryAppMasterConfig(appId) =>
- val config =
- if (appMasterRegistry.contains(appId)) {
- val (_, info) = appMasterRegistry(appId)
- info.config
- } else {
- null
- }
- sender ! AppMasterConfig(config)
-
- case appMasterDataRequest: AppMasterDataRequest =>
- val appId = appMasterDataRequest.appId
- val appStatus = getAppMasterStatus(appId)
-
- appStatus match {
- case AppMasterNonExist =>
- sender ! AppMasterData(AppMasterNonExist)
- case _ =>
- val (appMaster, info) = appMasterRegistry(appId)
- val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
- val workerPath = Option(info.worker).map(
- worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
- sender ! AppMasterData(
- appStatus, appId, info.appName, appMasterPath, workerPath,
- info.submissionTime, info.startTime, info.finishTime, info.user)
- }
- }
-
- def workerMessage: Receive = {
- case ShutdownExecutorSucceed(appId, executorId) =>
- LOG.info(s"Shut down executor $executorId for application $appId successfully")
- case failed: ShutdownExecutorFailed =>
- LOG.error(failed.reason)
- }
-
- private def getAppMasterStatus(appId: Int): AppMasterStatus = {
- if (activeAppMasters.contains(appId)) {
- AppMasterActive
- } else if (deadAppMasters.contains(appId)) {
- AppMasterInActive
- } else if (appMasterRegistry.contains(appId)) {
- AppMasterPending
- } else {
- AppMasterNonExist
- }
- }
-
- private def shutDownExecutorTimeOut(): Unit = {
- LOG.error(s"Shut down executor time out")
- }
-
- def appMasterMessage: Receive = {
- case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
- val startTime = System.currentTimeMillis()
- val register = registerBack.copy(startTime = startTime)
-
- LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
- context.watch(appMaster)
- appMasterRegistry += register.appId -> (appMaster, register)
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- sender ! AppMasterRegistered(register.appId)
-
- case ActivateAppMaster(appId) =>
- LOG.info(s"Activate AppMaster for app $appId")
- activeAppMasters += appId
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- sender ! AppMasterActivated(appId)
- }
-
- def appDataStoreService: Receive = {
- case SaveAppData(appId, key, value) =>
- val client = sender()
- (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
- case PutKVSuccess =>
- client ! AppDataSaved
- case PutKVFailed(k, ex) =>
- client ! SaveAppDataFailed
- }
- case GetAppData(appId, key) =>
- val client = sender()
- (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
- case GetKVSuccess(privateKey, value) =>
- client ! GetAppDataResult(key, value)
- case GetKVFailed(ex) =>
- client ! GetAppDataResult(key, null)
- }
- }
-
- def terminationWatch: Receive = {
- case terminate: Terminated =>
- LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
- s"network down: ${terminate.getAddressTerminated}")
-
- // Now we assume that the only normal way to stop the application is submitting a
- // ShutdownApplication request
- val application = appMasterRegistry.find { appInfo =>
- val (_, (actorRef, _)) = appInfo
- actorRef.compareTo(terminate.actor) == 0
- }
- if (application.nonEmpty) {
- val appId = application.get._1
- (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
- case GetKVSuccess(_, result) =>
- val appState = result.asInstanceOf[ApplicationState]
- if (appState != null) {
- LOG.info(s"Recovering application, $appId")
- self ! RecoverApplication(appState)
- } else {
- LOG.error(s"Cannot find application state for $appId")
- }
- case GetKVFailed(ex) =>
- LOG.error(s"Cannot find master state to recover")
- }
- }
- }
-
- def selfMsgHandler: Receive = {
- case RecoverApplication(state) =>
- val appId = state.appId
- if (appMasterRestartPolicies.get(appId).get.allowRestart) {
- LOG.info(s"AppManager Recovering Application $appId...")
- activeAppMasters -= appId
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username,
- context.parent, None), s"launcher${appId}_${Util.randInt()}")
- } else {
- LOG.error(s"Application $appId failed too many times")
- }
- }
-
- case class RecoverApplication(applicationStatus: ApplicationState)
-
- private def cleanApplicationData(appId: Int): Unit = {
- if (appMasterRegistry.contains(appId)) {
- // Add the dead app to dead appMasters
- deadAppMasters += appId
- // Remove the dead app from active appMasters
- activeAppMasters -= appId
-
- appMasterRegistry += appId -> {
- val (ref, info) = appMasterRegistry(appId)
- (ref, info.copy(finishTime = System.currentTimeMillis()))
- }
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- kvService ! DeleteKVGroup(appId.toString)
- }
- }
-
- private def applicationNameExist(appName: String): Boolean = {
- appMasterRegistry.values.exists { case (_, info) =>
- info.appName == appName && !deadAppMasters.contains(info.appId)
- }
- }
-}
-
-object AppManager {
- final val APP_STATE = "app_state"
- // The id is used in KVStore
- final val MASTER_STATE = "master_state"
-
- case class MasterState(
- maxId: Int,
- appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
- activeAppMasters: Set[Int],
- deadAppMasters: Set[Int])
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
deleted file mode 100644
index 3e54214..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.cluster.Cluster
-import akka.cluster.ddata.Replicator._
-import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.LogUtil
-
-/**
- * A replicated simple in-memory KV service. The replications are stored on all masters.
- */
-class InMemoryKVService extends Actor with Stash {
- import org.apache.gearpump.cluster.master.InMemoryKVService._
-
- private val KV_SERVICE = "gearpump_kvservice"
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private val replicator = DistributedData(context.system).replicator
- private implicit val cluster = Cluster(context.system)
-
- // Optimize write path, we can tolerate one master down for recovery.
- private val timeout = Duration(15, TimeUnit.SECONDS)
- private val readMajority = ReadMajority(timeout)
- private val writeMajority = WriteMajority(timeout)
-
- private def groupKey(group: String): LWWMapKey[Any] = {
- LWWMapKey[Any](KV_SERVICE + "_" + group)
- }
-
- def receive: Receive = kvService
-
- def kvService: Receive = {
-
- case GetKV(group: String, key: String) =>
- val request = Request(sender(), key)
- replicator ! Get(groupKey(group), readMajority, Some(request))
- case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- val appData = success.get(group)
- LOG.info(s"Successfully retrived group: ${group.id}")
- request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
- case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- LOG.info(s"We cannot find group $group")
- request.client ! GetKVSuccess(request.key, null)
- case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- val error = s"Failed to get application data, the request key is ${request.key}"
- LOG.error(error)
- request.client ! GetKVFailed(new Exception(error))
-
- case PutKV(group: String, key: String, value: Any) =>
- val request = Request(sender(), key)
- val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
- map + (key -> value)
- }
- replicator ! update
- case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- request.client ! PutKVSuccess
- case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
- request.client ! PutKVFailed(request.key, new Exception(error, cause))
- case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- request.client ! PutKVFailed(request.key, new TimeoutException())
-
- case delete@DeleteKVGroup(group: String) =>
- replicator ! Delete(groupKey(group), writeMajority)
- case DeleteSuccess(group) =>
- LOG.info(s"KV Group ${group.id} is deleted")
- case ReplicationDeleteFailure(group) =>
- LOG.error(s"Failed to delete KV Group ${group.id}...")
- case DataDeleted(group) =>
- LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
- }
-}
-
-object InMemoryKVService {
- /**
- * KV Service related
- */
- case class GetKV(group: String, key: String)
-
- trait GetKVResult
-
- case class GetKVSuccess(key: String, value: Any) extends GetKVResult
-
- case class GetKVFailed(ex: Throwable) extends GetKVResult
-
- case class PutKV(group: String, key: String, value: Any)
-
- case class DeleteKVGroup(group: String)
-
- case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
-
- trait PutKVResult
-
- case object PutKVSuccess extends PutKVResult
-
- case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
-
- case class Request(client: ActorRef, key: String)
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
deleted file mode 100644
index 6b4df07..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import java.lang.management.ManagementFactory
-import org.apache.gearpump.cluster.worker.WorkerId
-import org.apache.gearpump.jarstore.JarStoreServer
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-
-import akka.actor._
-import akka.remote.DisassociatedEvent
-import com.typesafe.config.Config
-import org.apache.commons.lang.exception.ExceptionUtils
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster._
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.MasterToAppMaster._
-import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
-import org.apache.gearpump.cluster.MasterToWorker._
-import org.apache.gearpump.cluster.WorkerToMaster._
-import org.apache.gearpump.cluster.master.InMemoryKVService._
-import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.metrics.Metrics.ReportMetrics
-import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import org.apache.gearpump.util._
-
-/**
- * Master Actor who manages resources of the whole cluster.
- * It is like the resource manager of YARN.
- */
-private[cluster] class Master extends Actor with Stash {
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private val systemConfig: Config = context.system.settings.config
- private implicit val timeout = Constants.FUTURE_TIMEOUT
- private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
- // Resources and resourceRequests can be dynamically constructed by
- // heartbeat of worker and appmaster when master singleton is migrated.
- // We don't need to persist them in cluster
- private var appManager: ActorRef = null
-
- private var scheduler: ActorRef = null
-
- private var workers = new immutable.HashMap[ActorRef, WorkerId]
-
- private val birth = System.currentTimeMillis()
-
- private var nextWorkerId = 0
-
- def receive: Receive = null
-
- // Register jvm metrics
- Metrics(context.system).register(new JvmMetricsSet(s"master"))
-
- LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
-
- val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
-
- private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
-
- private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
-
- // Maintain the list of active masters.
- private var masters: List[MasterNode] = {
- // Add myself into the list of initial masters.
- List(MasterNode(hostPort.host, hostPort.port))
- }
-
- val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-
- val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
- val historyMetricsService = if (metricsEnabled) {
- val historyMetricsService = {
- context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
- }
-
- val metricsReportService = context.actorOf(
- Props(new MetricsReporterService(Metrics(context.system))))
- historyMetricsService.tell(ReportMetrics, metricsReportService)
- Some(historyMetricsService)
- } else {
- None
- }
-
- kvService ! GetKV(MASTER_GROUP, WORKER_ID)
- context.become(waitForNextWorkerId)
-
- def waitForNextWorkerId: Receive = {
- case GetKVSuccess(_, result) =>
- if (result != null) {
- this.nextWorkerId = result.asInstanceOf[Int]
- } else {
- LOG.warn("Cannot find existing state in the distributed cluster...")
- }
- context.become(receiveHandler)
- unstashAll()
- case GetKVFailed(ex) =>
- LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
- context.parent ! PoisonPill
- case msg =>
- LOG.info(s"Get message ${msg.getClass.getSimpleName}")
- stash()
- }
-
- def receiveHandler: Receive = workerMsgHandler orElse
- appMasterMsgHandler orElse
- onMasterListChange orElse
- clientMsgHandler orElse
- metricsService orElse
- jarStoreService orElse
- terminationWatch orElse
- disassociated orElse
- kvServiceMsgHandler orElse
- ActorUtil.defaultMsgHandler(self)
-
- def workerMsgHandler: Receive = {
- case RegisterNewWorker =>
- val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
- nextWorkerId += 1
- kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
- val workerHostname = ActorUtil.getHostname(sender())
- LOG.info(s"Register new from $workerHostname ....")
- self forward RegisterWorker(workerId)
-
- case RegisterWorker(id) =>
- context.watch(sender())
- sender ! WorkerRegistered(id, MasterInfo(self, birth))
- scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
- workers += (sender() -> id)
- val workerHostname = ActorUtil.getHostname(sender())
- LOG.info(s"Register Worker with id $id from $workerHostname ....")
- case resourceUpdate: ResourceUpdate =>
- scheduler forward resourceUpdate
- }
-
- def jarStoreService: Receive = {
- case GetJarStoreServer =>
- jarStore forward GetJarStoreServer
- }
-
- def kvServiceMsgHandler: Receive = {
- case PutKVSuccess =>
- // Skip
- case PutKVFailed(key, exception) =>
- LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
- ExceptionUtils.getStackTrace(exception))
- }
-
- def metricsService: Receive = {
- case query: QueryHistoryMetrics =>
- if (historyMetricsService.isEmpty) {
- // Returns empty metrics so that we don't hang the UI
- sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
- } else {
- historyMetricsService.get forward query
- }
- }
-
- def appMasterMsgHandler: Receive = {
- case request: RequestResource =>
- scheduler forward request
- case registerAppMaster: RegisterAppMaster =>
- appManager forward registerAppMaster
- case activateAppMaster: ActivateAppMaster =>
- appManager forward activateAppMaster
- case save: SaveAppData =>
- appManager forward save
- case get: GetAppData =>
- appManager forward get
- case GetAllWorkers =>
- sender ! WorkerList(workers.values.toList)
- case GetMasterData =>
- val aliveFor = System.currentTimeMillis() - birth
- val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
- val userDir = System.getProperty("user.dir")
-
- val masterDescription =
- MasterSummary(
- MasterNode(hostPort.host, hostPort.port),
- masters,
- aliveFor,
- logFileDir,
- jarStoreRootPath,
- MasterStatus.Synced,
- userDir,
- List.empty[MasterActivity],
- jvmName = ManagementFactory.getRuntimeMXBean().getName(),
- historyMetricsConfig = getHistoryMetricsConfig
- )
-
- sender ! MasterData(masterDescription)
-
- case invalidAppMaster: InvalidAppMaster =>
- appManager forward invalidAppMaster
- }
-
- import scala.util.{Failure, Success}
-
- def onMasterListChange: Receive = {
- case MasterListUpdated(masters: List[MasterNode]) =>
- this.masters = masters
- }
-
- def clientMsgHandler: Receive = {
- case app: SubmitApplication =>
- LOG.debug(s"Receive from client, SubmitApplication $app")
- appManager.forward(app)
- case app: RestartApplication =>
- LOG.debug(s"Receive from client, RestartApplication $app")
- appManager.forward(app)
- case app: ShutdownApplication =>
- LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
- scheduler ! ApplicationFinished(app.appId)
- appManager.forward(app)
- case app: ResolveAppId =>
- LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
- appManager.forward(app)
- case resolve: ResolveWorkerId =>
- LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
- val worker = workers.find(_._2 == resolve.workerId)
- worker match {
- case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
- case None => sender ! ResolveWorkerIdResult(Failure(
- new Exception(s"cannot find worker ${resolve.workerId}")))
- }
- case AppMastersDataRequest =>
- LOG.debug("Master received AppMastersDataRequest")
- appManager forward AppMastersDataRequest
- case appMasterDataRequest: AppMasterDataRequest =>
- LOG.debug("Master received AppMasterDataRequest")
- appManager forward appMasterDataRequest
- case query: QueryAppMasterConfig =>
- LOG.debug("Master received QueryAppMasterConfig")
- appManager forward query
- case QueryMasterConfig =>
- sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
- }
-
- def disassociated: Receive = {
- case disassociated: DisassociatedEvent =>
- LOG.info(s" disassociated ${disassociated.remoteAddress}")
- }
-
- def terminationWatch: Receive = {
- case t: Terminated =>
- val actor = t.actor
- LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
- t.getAddressTerminated())
-
- LOG.info("Let's filter out dead resources...")
- // Filters out dead worker resource
- if (workers.keySet.contains(actor)) {
- scheduler ! WorkerTerminated(workers.get(actor).get)
- workers -= actor
- }
- }
-
- override def preStart(): Unit = {
- val path = ActorUtil.getFullPath(context.system, self.path)
- LOG.info(s"master path is $path")
- val schedulerClass = Class.forName(
- systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
-
- appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
- classOf[AppManager].getSimpleName)
- scheduler = context.actorOf(Props(schedulerClass))
- context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
- }
-}
-
-object Master {
- final val MASTER_GROUP = "master_group"
-
- final val WORKER_ID = "next_worker_id"
-
- case class WorkerTerminated(workerId: WorkerId)
-
- case class MasterInfo(master: ActorRef, startTime: Long = 0L)
-
- /** Notify the subscriber that master actor list has been updated */
- case class MasterListUpdated(masters: List[MasterNode])
-
- object MasterInfo {
- def empty: MasterInfo = MasterInfo(null)
- }
-
- case class SlotStatus(totalSlots: Int, availableSlots: Int)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
deleted file mode 100644
index 1429694..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.collection.mutable
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
-import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import org.apache.gearpump.cluster.scheduler.Relaxation._
-import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
-
-/** Assign resource to application based on the priority of the application */
-class PriorityScheduler extends Scheduler {
- private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
-
- def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
- override def compare(x: PendingRequest, y: PendingRequest): Int = {
- var res = x.request.priority.id - y.request.priority.id
- if (res == 0) {
- res = y.timeStamp.compareTo(x.timeStamp)
- }
- res
- }
- }
-
- override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
-
- override def allocateResource(): Unit = {
- var scheduleLater = Array.empty[PendingRequest]
- val resourcesSnapShot = resources.clone()
- var allocated = Resource.empty
- val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
-
- while (resourceRequests.nonEmpty && (allocated < totalResource)) {
- val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
- request.relaxation match {
- case ANY =>
- val allocations = allocateFairly(resourcesSnapShot, request)
- val newAllocated = Resource(allocations.map(_.resource.slots).sum)
- if (allocations.nonEmpty) {
- appMaster ! ResourceAllocated(allocations.toArray)
- }
- if (newAllocated < request.resource) {
- val remainingRequest = request.resource - newAllocated
- val remainingExecutors = request.executorNum - allocations.length
- val newResourceRequest = request.copy(resource = remainingRequest,
- executorNum = remainingExecutors)
- scheduleLater = scheduleLater :+
- PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
- }
- allocated = allocated + newAllocated
- case ONEWORKER =>
- val availableResource = resourcesSnapShot.find { params =>
- val (_, (_, resource)) = params
- resource > request.resource
- }
- if (availableResource.nonEmpty) {
- val (workerId, (worker, resource)) = availableResource.get
- allocated = allocated + request.resource
- appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
- workerId)))
- resourcesSnapShot.update(workerId, (worker, resource - request.resource))
- } else {
- scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
- }
- case SPECIFICWORKER =>
- val workerAndResource = resourcesSnapShot.get(request.workerId)
- if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
- val (worker, availableResource) = workerAndResource.get
- appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
- request.workerId)))
- allocated = allocated + request.resource
- resourcesSnapShot.update(request.workerId, (worker,
- availableResource - request.resource))
- } else {
- scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
- }
- }
- }
- for (request <- scheduleLater)
- resourceRequests.enqueue(request)
- }
-
- def resourceRequestHandler: Receive = {
- case RequestResource(appId, request) =>
- LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
- s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
- val appMaster = sender()
- resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
- System.currentTimeMillis()))
- allocateResource()
- }
-
- override def doneApplication(appId: Int): Unit = {
- resourceRequests = resourceRequests.filter(_.appId != appId)
- }
-
- private def allocateFairly(
- resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
- : List[ResourceAllocation] = {
- val workerNum = resources.size
- var allocations = List.empty[ResourceAllocation]
- var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
- var remainingRequest = request.resource
- var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
-
- while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
- val exeutorNum = Math.min(workerNum, remainingExecutors)
- val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
-
- val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
- val pickedResources = sortedResources.take(exeutorNum)
-
- val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
- val ((workerId, (worker, resource)), index) = workerWithIndex
- 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
- }.sortBy(_._2).map(_._1)
-
- if (flattenResource.length < toRequest.slots) {
- // Can not safisfy the user's requirements
- totalAvailable = Resource.empty
- } else {
- flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
- toArray.foreach { params =>
- val ((workerId, worker), slots) = params
- resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
- allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
- }
- totalAvailable -= toRequest
- remainingRequest -= toRequest
- remainingExecutors -= exeutorNum
- }
- }
- allocations
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
deleted file mode 100644
index 7187c1a..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.collection.mutable
-
-import akka.actor.{Actor, ActorRef}
-import org.slf4j.Logger
-
-import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
-import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
-import org.apache.gearpump.cluster.master.Master.WorkerTerminated
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Scheduler schedule resource for different applications.
- */
-abstract class Scheduler extends Actor {
- val LOG: Logger = LogUtil.getLogger(getClass)
- protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
-
- def handleScheduleMessage: Receive = {
- case WorkerRegistered(id, _) =>
- if (!resources.contains(id)) {
- LOG.info(s"Worker $id added to the scheduler")
- resources.put(id, (sender, Resource.empty))
- }
- case update@ResourceUpdate(worker, workerId, resource) =>
- LOG.info(s"$update...")
- if (resources.contains(workerId)) {
- val resourceReturned = resource > resources.get(workerId).get._2
- resources.update(workerId, (worker, resource))
- if (resourceReturned) {
- allocateResource()
- }
- sender ! UpdateResourceSucceed
- }
- else {
- sender ! UpdateResourceFailed(
- s"ResourceUpdate failed! The worker $workerId has not been registered into master")
- }
- case WorkerTerminated(workerId) =>
- if (resources.contains(workerId)) {
- resources -= workerId
- }
- case ApplicationFinished(appId) =>
- doneApplication(appId)
- }
-
- def allocateResource(): Unit
-
- def doneApplication(appId: Int): Unit
-}
-
-object Scheduler {
- case class PendingRequest(
- appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
-
- case class ApplicationFinished(appId: Int)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
deleted file mode 100644
index b4e6f9e..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.worker
-
-import java.io.File
-
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
-
-/** Launcher to start an executor process */
-class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override def createProcess(
- appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
- classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
-
- LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}")
- Util.startProcess(options, classPath, mainClass, arguments)
- }
-
- override def cleanProcess(appId: Int, executorId: Int): Unit = {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
deleted file mode 100644
index 1b52e5d..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
+++ /dev/null
@@ -1,581 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.worker
-
-import java.io.File
-import java.lang.management.ManagementFactory
-import java.net.URL
-import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
-
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.SupervisorStrategy.Stop
-import akka.actor._
-import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory}
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import org.apache.gearpump.cluster.AppMasterToWorker._
-import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
-import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
-import org.apache.gearpump.cluster.MasterToWorker._
-import org.apache.gearpump.cluster.WorkerToAppMaster._
-import org.apache.gearpump.cluster.WorkerToMaster._
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
-import org.apache.gearpump.metrics.Metrics.ReportMetrics
-import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import org.apache.gearpump.util.ActorSystemBooter.Daemon
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import org.apache.gearpump.util.{TimeOutScheduler, _}
-
-/**
- * Worker is used to track the resource on single machine, it is like
- * the node manager of YARN.
- *
- * @param masterProxy masterProxy is used to resolve the master
- */
-private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
- private val systemConfig: Config = context.system.settings.config
-
- private val address = ActorUtil.getFullPath(context.system, self.path)
- private var resource = Resource.empty
- private var allocatedResources = Map[ActorRef, Resource]()
- private var executorsInfo = Map[ActorRef, ExecutorSlots]()
- private var id: WorkerId = WorkerId.unspecified
- private val createdTime = System.currentTimeMillis()
- private var masterInfo: MasterInfo = null
- private var executorNameToActor = Map.empty[String, ActorRef]
- private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
- private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
-
- private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
- private val resourceUpdateTimeoutMs = 30000 // Milliseconds
-
- private var totalSlots: Int = 0
-
- val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
- var historyMetricsService: Option[ActorRef] = None
-
- override def receive: Receive = null
- var LOG: Logger = LogUtil.getLogger(getClass)
-
- def service: Receive =
- appMasterMsgHandler orElse
- clientMessageHandler orElse
- metricsService orElse
- terminationWatch(masterInfo.master) orElse
- ActorUtil.defaultMsgHandler(self)
-
- def metricsService: Receive = {
- case query: QueryHistoryMetrics =>
- if (historyMetricsService.isEmpty) {
- // Returns empty metrics so that we don't hang the UI
- sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
- } else {
- historyMetricsService.get forward query
- }
- }
-
- private var metricsInitialized = false
-
- val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-
- private def initializeMetrics(): Unit = {
- // Registers jvm metrics
- val metricsSetName = "worker" + WorkerId.render(id)
- Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
-
- historyMetricsService = if (metricsEnabled) {
- val historyMetricsService = {
- context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
- }
-
- val metricsReportService = context.actorOf(Props(
- new MetricsReporterService(Metrics(context.system))))
- historyMetricsService.tell(ReportMetrics, metricsReportService)
- Some(historyMetricsService)
- } else {
- None
- }
- }
-
- def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
-
- // If master get disconnected, the WorkerRegistered may be triggered multiple times.
- case WorkerRegistered(id, masterInfo) =>
- this.id = id
-
- // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
- // itself.
- if (!metricsInitialized) {
- initializeMetrics()
- metricsInitialized = true
- }
-
- this.masterInfo = masterInfo
- timeoutTicker.cancel()
- context.watch(masterInfo.master)
- this.LOG = LogUtil.getLogger(getClass, worker = id)
- LOG.info(s"Worker is registered. " +
- s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
- sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
- resourceUpdateTimeoutMs, updateResourceTimeOut())
- context.become(service)
- }
-
- private def updateResourceTimeOut(): Unit = {
- LOG.error(s"Update worker resource time out")
- }
-
- def appMasterMsgHandler: Receive = {
- case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
- val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
- val executorToStop = executorNameToActor.get(actorName)
- if (executorToStop.isDefined) {
- LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
- s"due to: $reason")
- executorToStop.get.forward(shutdown)
- } else {
- LOG.error(s"Cannot find executor $actorName, ignore this message")
- sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
- }
- case launch: LaunchExecutor =>
- LOG.info(s"$launch")
- if (resource < launch.resource) {
- sender ! ExecutorLaunchRejected("There is no free resource on this machine")
- } else {
- val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
-
- val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
- jarStoreClient, executorProcLauncher))
- executorNameToActor += actorName -> executor
-
- resource = resource - launch.resource
- allocatedResources = allocatedResources + (executor -> launch.resource)
-
- reportResourceToMaster()
- executorsInfo += executor ->
- ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
- context.watch(executor)
- }
- case UpdateResourceFailed(reason, ex) =>
- LOG.error(reason)
- context.stop(self)
- case UpdateResourceSucceed =>
- LOG.info(s"Update resource succeed")
- case GetWorkerData(workerId) =>
- val aliveFor = System.currentTimeMillis() - createdTime
- val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
- val userDir = System.getProperty("user.dir")
- sender ! WorkerData(WorkerSummary(
- id, "active",
- address,
- aliveFor,
- logDir,
- executorsInfo.values.toArray,
- totalSlots,
- resource.slots,
- userDir,
- jvmName = ManagementFactory.getRuntimeMXBean().getName(),
- resourceManagerContainerId = systemConfig.getString(
- GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
- historyMetricsConfig = getHistoryMetricsConfig)
- )
- case ChangeExecutorResource(appId, executorId, usedResource) =>
- for (executor <- executorActorRef(appId, executorId);
- allocatedResource <- allocatedResources.get(executor)) {
-
- allocatedResources += executor -> usedResource
- resource = resource + allocatedResource - usedResource
- reportResourceToMaster()
-
- if (usedResource == Resource(0)) {
- executorsInfo -= executor
- allocatedResources -= executor
- // stop executor if there is no resource binded to it.
- LOG.info(s"Shutdown executor $executorId because the resource used is zero")
- executor ! ShutdownExecutor(appId, executorId,
- "Shutdown executor because the resource used is zero")
- }
- }
- }
-
- private def reportResourceToMaster(): Unit = {
- sendMsgWithTimeOutCallBack(masterInfo.master,
- ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
- }
-
- private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
- val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
- executorNameToActor.get(actorName)
- }
-
- def clientMessageHandler: Receive = {
- case QueryWorkerConfig(workerId) =>
- if (this.id == workerId) {
- sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
- } else {
- sender ! WorkerConfig(ConfigFactory.empty)
- }
- }
-
- private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
- repeatActionUtil(
- seconds = timeOutSeconds,
- action = () => {
- masterProxy ! RegisterWorker(workerId)
- },
- onTimeout = () => {
- LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
- s"seconds, abort and kill the worker...")
- self ! PoisonPill
- })
- }
-
- def terminationWatch(master: ActorRef): Receive = {
- case Terminated(actor) =>
- if (actor.compareTo(master) == 0) {
- // Parent master is down, no point to keep worker anymore. Let's make suicide to free
- // resources
- LOG.info(s"Master cannot be contacted, find a new master ...")
- context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
- } else if (ActorUtil.isChildActorPath(self, actor)) {
- // One executor is down,
- LOG.info(s"Executor is down ${getExecutorName(actor)}")
-
- val allocated = allocatedResources.get(actor)
- if (allocated.isDefined) {
- resource = resource + allocated.get
- executorsInfo -= actor
- allocatedResources = allocatedResources - actor
- sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
- resourceUpdateTimeoutMs, updateResourceTimeOut())
- }
- }
- }
-
- private def getExecutorName(actorRef: ActorRef): Option[String] = {
- executorNameToActor.find(_._2 == actorRef).map(_._1)
- }
-
- private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
- val launcherClazz = Class.forName(
- systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
- launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
- .asInstanceOf[ExecutorProcessLauncher]
- }
-
- import context.dispatcher
- override def preStart(): Unit = {
- LOG.info(s"RegisterNewWorker")
- totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
- this.resource = Resource(totalSlots)
- masterProxy ! RegisterNewWorker
- context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
- }
-
- private def registerTimeoutTicker(seconds: Int): Cancellable = {
- repeatActionUtil(seconds, () => Unit, () => {
- LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
- s"abort and kill the worker...")
- self ! PoisonPill
- })
- }
-
- private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
- : Cancellable = {
- val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
- Duration(2, TimeUnit.SECONDS))(action())
- val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
- new Cancellable {
- def cancel(): Boolean = {
- val result1 = cancelTimeout.cancel()
- val result2 = cancelSuicide.cancel()
- result1 && result2
- }
-
- def isCancelled: Boolean = {
- cancelTimeout.isCancelled && cancelSuicide.isCancelled
- }
- }
- }
-
- override def postStop(): Unit = {
- LOG.info(s"Worker is going down....")
- ioPool.shutdown()
- context.system.terminate()
- }
-}
-
-private[cluster] object Worker {
-
- case class ExecutorResult(result: Try[Int])
-
- class ExecutorWatcher(
- launch: LaunchExecutor,
- masterInfo: MasterInfo,
- ioPool: ExecutionContext,
- jarStoreClient: JarStoreClient,
- procLauncher: ExecutorProcessLauncher) extends Actor {
- import launch.{appId, executorId, resource}
-
- private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
-
- val executorConfig: Config = {
- val workerConfig = context.system.settings.config
-
- val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
- Option(jvmConfig.executorAkkaConfig)
- }.getOrElse(ConfigFactory.empty())
-
- resolveExecutorConfig(workerConfig, submissionConfig)
- }
-
- // For some config, worker has priority, for others, user Application submission config
- // have priorities.
- private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
- val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
- .withoutPath(GEARPUMP_CLUSTER_MASTERS)
- .withoutPath(GEARPUMP_HOME)
- .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
- .withoutPath(GEARPUMP_LOG_APPLICATION_DIR)
- .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
- // Falls back to workerConfig
- .withFallback(workerConfig)
-
- // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
- val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
- val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
- LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
- config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
- } else {
- config
- }
-
- // Excludes reference.conf, and JVM properties..
- ClusterConfig.filterOutDefaultConfig(updatedConf)
- }
-
- implicit val executorService = ioPool
-
- private val executorHandler = {
- val ctx = launch.executorJvmConfig
-
- if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
- new ExecutorHandler {
- val exitPromise = Promise[Int]()
- val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
-
- override def destroy(): Unit = {
- context.stop(app)
- }
- override def exitValue: Future[Int] = {
- exitPromise.future
- }
- }
- } else {
- createProcess(ctx)
- }
- }
-
- private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
-
- val process = Future {
- val jarPath = ctx.jar.map { appJar =>
- val tempFile = File.createTempFile(appJar.name, ".jar")
- jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
- val file = new URL("file:" + tempFile)
- file.getFile
- }
-
- val configFile = {
- val configFile = File.createTempFile("gearpump", ".conf")
- ClusterConfig.saveConfig(executorConfig, configFile)
- val file = new URL("file:" + configFile)
- file.getFile
- }
-
- val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
- ctx.classPath.map(path => expandEnviroment(path)) ++
- jarPath.map(Array(_)).getOrElse(Array.empty[String])
-
- val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
- val logArgs = List(
- s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
- s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
- s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
- s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
- val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
-
- val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
-
- // Remote debug executor process
- val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
- val remoteDebugConfig = if (remoteDebugFlag) {
- val availablePort = Util.findFreePort().get
- List(
- "-Xdebug",
- s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
- s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
- )
- } else {
- List.empty[String]
- }
-
- val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
- val verboseGCConfig = if (verboseGCFlag) {
- List(
- s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
- "-verbose:gc",
- "-XX:+PrintGCDetails",
- "-XX:+PrintGCDateStamps",
- "-XX:+PrintTenuringDistribution",
- "-XX:+PrintGCApplicationConcurrentTime",
- "-XX:+PrintGCApplicationStoppedTime"
- )
- } else {
- List.empty[String]
- }
-
- val ipv4 = List(s"-D${PREFER_IPV4}=true")
-
- val options = ctx.jvmArguments ++ username ++
- logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
-
- val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
- options, classPath, ctx.mainClass, ctx.arguments)
-
- ProcessInfo(process, jarPath, configFile)
- }
-
- new ExecutorHandler {
-
- var destroyed = false
-
- override def destroy(): Unit = {
- LOG.info(s"Destroy executor process ${ctx.mainClass}")
- if (!destroyed) {
- destroyed = true
- process.foreach { info =>
- info.process.destroy()
- info.jarPath.foreach(new File(_).delete())
- new File(info.configFile).delete()
- }
- }
- }
-
- override def exitValue: Future[Int] = {
- process.flatMap { info =>
- val exit = info.process.exitValue()
- if (exit == 0) {
- Future.successful(0)
- } else {
- Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
- s"error summary: ${info.process.logger.error}"))
- }
- }
- }
- }
- }
-
- private def expandEnviroment(path: String): String = {
- // TODO: extend this to support more environment.
- path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
- }
-
- override def preStart(): Unit = {
- executorHandler.exitValue.onComplete { value =>
- procLauncher.cleanProcess(appId, executorId)
- val result = ExecutorResult(value)
- self ! result
- }
- }
-
- override def postStop(): Unit = {
- executorHandler.destroy()
- }
-
- // The folders are under ${GEARPUMP_HOME}
- val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
- File.separator + "yarn")
-
- override def receive: Receive = {
- case ShutdownExecutor(appId, executorId, reason: String) =>
- executorHandler.destroy()
- sender ! ShutdownExecutorSucceed(appId, executorId)
- context.stop(self)
- case ExecutorResult(executorResult) =>
- executorResult match {
- case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
- case Failure(e) => LOG.error("Executor exit with errors", e)
- }
- context.stop(self)
- }
-
- private def getFormatedTime(timestamp: Long): String = {
- val datePattern = "yyyy-MM-dd-HH-mm"
- val format = new java.text.SimpleDateFormat(datePattern)
- format.format(timestamp)
- }
-
- private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
- classPath.filterNot(matchDaemonPattern(_))
- }
-
- private def matchDaemonPattern(path: String): Boolean = {
- daemonPathPattern.exists(path.contains(_))
- }
- }
-
- trait ExecutorHandler {
- def destroy(): Unit
- def exitValue: Future[Int]
- }
-
- case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
-
- /**
- * Starts the executor in the same JVM as worker.
- */
- class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
- extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
- private val exitCode = 0
-
- override val supervisorStrategy =
- OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
- case ex: Throwable =>
- LOG.error(s"system $name stopped ", ex)
- exit.failure(ex)
- Stop
- }
-
- override def postStop(): Unit = {
- if (!exit.isCompleted) {
- exit.success(exitCode)
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
deleted file mode 100644
index a6b75cb..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.pattern.ask
-import akka.testkit.TestActorRef
-import com.typesafe.config.ConfigValueFactory
-
-import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
-import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
-import org.apache.gearpump.cluster.master.Master
-import org.apache.gearpump.cluster.worker.Worker
-import org.apache.gearpump.util.Constants
-
-class MiniCluster {
- private val mockMasterIP = "127.0.0.1"
-
- implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
- withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
-
- val (mockMaster, worker) = {
- val master = system.actorOf(Props(classOf[Master]), "master")
- val worker = system.actorOf(Props(classOf[Worker], master), "worker")
-
- // Wait until worker register itself to master
- waitUtilWorkerIsRegistered(master)
- (master, worker)
- }
-
- def launchActor(props: Props): TestActorRef[Actor] = {
- TestActorRef(props)
- }
-
- private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
- while (!isWorkerRegistered(master)) {}
- }
-
- private def isWorkerRegistered(master: ActorRef): Boolean = {
- import scala.concurrent.duration._
- implicit val dispatcher = system.dispatcher
-
- implicit val futureTimeout = Constants.FUTURE_TIMEOUT
-
- val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
-
- // Waits until the worker is registered.
- val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
- workers.workers.size > 0
- }
-
- def shutDown(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
deleted file mode 100644
index 90fdd39..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import java.util.Properties
-
-import akka.testkit.TestProbe
-import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.transport.HostPort
-
-import scala.concurrent.Future
-import scala.util.{Success, Try}
-
-import com.typesafe.config.{ConfigFactory, Config}
-import org.scalatest._
-
-import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
-import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
-import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.{Constants, LogUtil, Util}
-
-class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-
- private val LOG = LogUtil.getLogger(getClass)
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- override def beforeEach(): Unit = {
- startActorSystem()
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- "Worker" should "register worker address to master when started." in {
-
- val masterReceiver = createMockMaster()
-
- val tempTestConf = convertTestConf(getHost, getPort)
-
- val options = Array(
- s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
- s"-D${PREFER_IPV4}=true"
- ) ++ getMasterListOption()
-
- val worker = Util.startProcess(options,
- getContextClassPath,
- getMainClassName(Worker),
- Array.empty)
-
- try {
- masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
-
- tempTestConf.delete()
- } finally {
- worker.destroy()
- }
- }
-
- "Master" should "accept worker RegisterNewWorker when started" in {
- val worker = TestProbe()(getActorSystem)
-
- val host = "127.0.0.1"
- val port = Util.findFreePort().get
-
- val properties = new Properties()
- properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
- properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
- val masterConfig = ConfigFactory.parseProperties(properties)
- .withFallback(TestUtil.MASTER_CONFIG)
- Future {
- Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
- }
-
- val masterProxy = getActorSystem.actorOf(
- MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
-
- worker.send(masterProxy, RegisterNewWorker)
- worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
- }
-
- "Info" should "be started without exception" in {
-
- val masterReceiver = createMockMaster()
-
- Future {
- org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
- }
-
- masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
- masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
- }
-
- "Kill" should "be started without exception" in {
-
- val masterReceiver = createMockMaster()
-
- Future {
- Kill.main(masterConfig, Array("-appid", "0"))
- }
-
- masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
- masterReceiver.reply(ShutdownApplicationResult(Success(0)))
- }
-
- "Replay" should "be started without exception" in {
-
- val masterReceiver = createMockMaster()
-
- Future {
- Replay.main(masterConfig, Array("-appid", "0"))
- }
-
- masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
- masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
- masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
- masterReceiver.reply(ReplayApplicationResult(Success(0)))
- }
-
- "Local" should "be started without exception" in {
- val port = Util.findFreePort().get
- val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
- s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
- s"-D${PREFER_IPV4}=true")
-
- val local = Util.startProcess(options,
- getContextClassPath,
- getMainClassName(Local),
- Array.empty)
-
- def retry(times: Int)(fn: => Boolean): Boolean = {
-
- LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
-
- val result = fn
- if (result || times <= 0) {
- result
- } else {
- Thread.sleep(1000)
- retry(times - 1)(fn)
- }
- }
-
- try {
- assert(retry(10)(isPortUsed("127.0.0.1", port)),
- "local is not started successfully, as port is not used " + port)
- } finally {
- local.destroy()
- }
- }
-
- "Gear" should "support app|info|kill|shell|replay" in {
-
- val commands = Array("app", "info", "kill", "shell", "replay")
-
- assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
-
- for (command <- commands) {
- assert(Try(Gear.main(Array("-noexist"))).isFailure,
- "pass unknown option, throw, command: " + command)
- }
-
- assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
-
- val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
- assert(tryThis.isFailure, "unknown command, throw")
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
deleted file mode 100644
index e1ba8f6..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.TestUtil
-
-class MasterWatcherSpec extends FlatSpec with Matchers {
- def config: Config = TestUtil.MASTER_CONFIG
-
- "MasterWatcher" should "kill itself when can not get a quorum" in {
- val system = ActorSystem("ForMasterWatcher", config)
-
- val actorWatcher = TestProbe()(system)
-
- val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
- actorWatcher watch masterWatcher
- actorWatcher.expectTerminated(masterWatcher, 5.seconds)
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
deleted file mode 100644
index 58e3593..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.util.Success
-
-import akka.actor.{Actor, ActorRef, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
-import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
-import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import org.apache.gearpump.cluster.master.AppManager._
-import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
-import org.apache.gearpump.cluster.{TestUtil, _}
-import org.apache.gearpump.util.LogUtil
-
-class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
- var kvService: TestProbe = null
- var haService: TestProbe = null
- var appLauncher: TestProbe = null
- var appManager: ActorRef = null
- private val LOG = LogUtil.getLogger(getClass)
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- override def beforeEach(): Unit = {
- startActorSystem()
- kvService = TestProbe()(getActorSystem)
- appLauncher = TestProbe()(getActorSystem)
-
- appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
- new DummyAppMasterLauncherFactory(appLauncher))))
- kvService.expectMsgType[GetKV]
- kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- "AppManager" should "handle AppMaster message correctly" in {
- val appMaster = TestProbe()(getActorSystem)
- val appId = 1
-
- val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
- appMaster.send(appManager, register)
- appMaster.expectMsgType[AppMasterRegistered]
-
- appMaster.send(appManager, ActivateAppMaster(appId))
- appMaster.expectMsgType[AppMasterActivated]
- }
-
- "DataStoreService" should "support Put and Get" in {
- val appMaster = TestProbe()(getActorSystem)
- appMaster.send(appManager, SaveAppData(0, "key", 1))
- kvService.expectMsgType[PutKV]
- kvService.reply(PutKVSuccess)
- appMaster.expectMsg(AppDataSaved)
-
- appMaster.send(appManager, GetAppData(0, "key"))
- kvService.expectMsgType[GetKV]
- kvService.reply(GetKVSuccess("key", 1))
- appMaster.expectMsg(GetAppDataResult("key", 1))
- }
-
- "AppManager" should "support application submission and shutdown" in {
- testClientSubmission(withRecover = false)
- }
-
- "AppManager" should "support application submission and recover if appmaster dies" in {
- LOG.info("=================testing recover==============")
- testClientSubmission(withRecover = true)
- }
-
- "AppManager" should "handle client message correctly" in {
- val mockClient = TestProbe()(getActorSystem)
- mockClient.send(appManager, ShutdownApplication(1))
- assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
-
- mockClient.send(appManager, ResolveAppId(1))
- assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
-
- mockClient.send(appManager, AppMasterDataRequest(1))
- mockClient.expectMsg(AppMasterData(AppMasterNonExist))
- }
-
- "AppManager" should "reject the application submission if the app name already existed" in {
- val app = TestUtil.dummyApp
- val submit = SubmitApplication(app, None, "username")
- val client = TestProbe()(getActorSystem)
- val appMaster = TestProbe()(getActorSystem)
- val worker = TestProbe()(getActorSystem)
- val appId = 1
-
- client.send(appManager, submit)
-
- kvService.expectMsgType[PutKV]
- appLauncher.expectMsg(LauncherStarted(appId))
- appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
- AppMasterRuntimeInfo(appId, app.name)))
- appMaster.expectMsgType[AppMasterRegistered]
-
- client.send(appManager, submit)
- assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
- }
-
- def testClientSubmission(withRecover: Boolean): Unit = {
- val app = TestUtil.dummyApp
- val submit = SubmitApplication(app, None, "username")
- val client = TestProbe()(getActorSystem)
- val appMaster = TestProbe()(getActorSystem)
- val worker = TestProbe()(getActorSystem)
- val appId = 1
-
- client.send(appManager, submit)
-
- kvService.expectMsgType[PutKV]
- appLauncher.expectMsg(LauncherStarted(appId))
- appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
- AppMasterRuntimeInfo(appId, app.name)))
- kvService.expectMsgType[PutKV]
- appMaster.expectMsgType[AppMasterRegistered]
-
- client.send(appManager, ResolveAppId(appId))
- client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
-
- client.send(appManager, AppMastersDataRequest)
- client.expectMsgType[AppMastersData]
-
- client.send(appManager, AppMasterDataRequest(appId, false))
- client.expectMsgType[AppMasterData]
-
- if (!withRecover) {
- client.send(appManager, ShutdownApplication(appId))
- client.expectMsg(ShutdownApplicationResult(Success(appId)))
- } else {
- // Do recovery
- getActorSystem.stop(appMaster.ref)
- kvService.expectMsgType[GetKV]
- val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
- kvService.reply(GetKVSuccess(APP_STATE, appState))
- appLauncher.expectMsg(LauncherStarted(appId))
- }
- }
-}
-
-class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
-
- override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
- username: String, master: ActorRef, client: Option[ActorRef]): Props = {
- Props(new DummyAppMasterLauncher(test, appId))
- }
-}
-
-class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
-
- test.ref ! LauncherStarted(appId)
- override def receive: Receive = {
- case any: Any => test.ref forward any
- }
-}
-
-case class LauncherStarted(appId: Int)
[4/4] incubator-gearpump git commit: [GEARPUMP-224] merge
gearpump-daemon to gearpump-core
Posted by ma...@apache.org.
[GEARPUMP-224] merge gearpump-daemon to gearpump-core
Author: huafengw <fv...@gmail.com>
Closes #98 from huafengw/merge.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c3d5eb63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c3d5eb63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c3d5eb63
Branch: refs/heads/master
Commit: c3d5eb63f1d0c6c542268e21fe8356e042dfa232
Parents: a01809b
Author: huafengw <fv...@gmail.com>
Authored: Fri Oct 14 19:55:05 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Oct 14 19:55:05 2016 +0800
----------------------------------------------------------------------
.../apache/gearpump/cluster/DaemonMessage.scala | 50 ++
.../cluster/embedded/EmbeddedCluster.scala | 95 +++
.../apache/gearpump/cluster/main/Local.scala | 89 +++
.../apache/gearpump/cluster/main/Master.scala | 236 ++++++++
.../apache/gearpump/cluster/main/Worker.scala | 70 +++
.../gearpump/cluster/master/AppManager.scala | 354 +++++++++++
.../cluster/master/InMemoryKVService.scala | 122 ++++
.../apache/gearpump/cluster/master/Master.scala | 311 ++++++++++
.../cluster/scheduler/PriorityScheduler.scala | 154 +++++
.../gearpump/cluster/scheduler/Scheduler.scala | 77 +++
.../worker/DefaultExecutorProcessLauncher.scala | 40 ++
.../apache/gearpump/cluster/worker/Worker.scala | 579 ++++++++++++++++++
.../apache/gearpump/cluster/MiniCluster.scala | 73 +++
.../cluster/appmaster/AppManagerSpec.scala | 182 ++++++
.../appmaster/InMemoryKVServiceSpec.scala | 69 +++
.../apache/gearpump/cluster/main/MainSpec.scala | 188 ++++++
.../cluster/main/MasterWatcherSpec.scala | 43 ++
.../scheduler/PrioritySchedulerSpec.scala | 230 ++++++++
.../gearpump/cluster/worker/WorkerSpec.scala | 128 ++++
.../apache/gearpump/cluster/DaemonMessage.scala | 51 --
.../cluster/embedded/EmbeddedCluster.scala | 95 ---
.../apache/gearpump/cluster/main/Local.scala | 90 ---
.../apache/gearpump/cluster/main/Master.scala | 236 --------
.../apache/gearpump/cluster/main/Worker.scala | 71 ---
.../gearpump/cluster/master/AppManager.scala | 355 -----------
.../cluster/master/InMemoryKVService.scala | 122 ----
.../apache/gearpump/cluster/master/Master.scala | 311 ----------
.../cluster/scheduler/PriorityScheduler.scala | 156 -----
.../gearpump/cluster/scheduler/Scheduler.scala | 79 ---
.../worker/DefaultExecutorProcessLauncher.scala | 41 --
.../apache/gearpump/cluster/worker/Worker.scala | 581 -------------------
.../apache/gearpump/cluster/MiniCluster.scala | 74 ---
.../apache/gearpump/cluster/main/MainSpec.scala | 190 ------
.../cluster/main/MasterWatcherSpec.scala | 44 --
.../cluster/master/AppManagerSpec.scala | 184 ------
.../cluster/master/InMemoryKVServiceSpec.scala | 69 ---
.../scheduler/PrioritySchedulerSpec.scala | 232 --------
.../gearpump/cluster/worker/WorkerSpec.scala | 129 ----
.../apache/gearpump/redis/RedisMessage.scala | 148 +++--
.../org/apache/gearpump/redis/RedisSink.scala | 27 +-
project/Build.scala | 41 +-
project/BuildExample.scala | 8 +-
project/Pack.scala | 14 +-
43 files changed, 3227 insertions(+), 3211 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
new file mode 100644
index 0000000..1e94132
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+
+/**
+ * Cluster Bootup Flow
+ */
+object WorkerToMaster {
+
+ /** When an worker is started, it sends RegisterNewWorker */
+ case object RegisterNewWorker
+
+ /** When worker lose connection with master, it tries to register itself again with old Id. */
+ case class RegisterWorker(workerId: WorkerId)
+
+ /** Worker is responsible to broadcast its current status to master */
+ case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
+}
+
+object MasterToWorker {
+
+ /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
+ case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
+
+ /** Worker have not received reply from master */
+ case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
+
+ /** Master is synced with worker on resource slots managed by current worker */
+ case object UpdateResourceSucceed
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
new file mode 100644
index 0000000..9bde4d1
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.embedded
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.{Config, ConfigValueFactory}
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
+import org.apache.gearpump.util.{LogUtil, Util}
+
+/**
+ * Create a in-process cluster with single worker
+ */
+class EmbeddedCluster(inputConfig: Config) {
+
+ private val workerCount: Int = 1
+ private var _master: ActorRef = null
+ private var _system: ActorSystem = null
+ private var _config: Config = null
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ def start(): Unit = {
+ val port = Util.findFreePort().get
+ val akkaConf = getConfig(inputConfig, port)
+ _config = akkaConf
+ val system = ActorSystem(MASTER, akkaConf)
+
+ val master = system.actorOf(Props[MasterActor], MASTER)
+
+ 0.until(workerCount).foreach { id =>
+ system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+ }
+ this._master = master
+ this._system = system
+
+ LOG.info("=================================")
+ LOG.info("Local Cluster is started at: ")
+ LOG.info(s" 127.0.0.1:$port")
+ LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+ }
+
+ private def getConfig(inputConfig: Config, port: Int): Config = {
+ val config = inputConfig.
+ withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+ withValue(GEARPUMP_CLUSTER_MASTERS,
+ ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+ withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
+ ConfigValueFactory.fromAnyRef(true)).
+ withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
+ withValue("akka.actor.provider",
+ ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+ config
+ }
+
+ def newClientContext: ClientContext = {
+ ClientContext(_config, _system, _master)
+ }
+
+ def stop(): Unit = {
+ _system.stop(_master)
+ _system.terminate()
+ Await.result(_system.whenTerminated, Duration.Inf)
+ }
+}
+
+object EmbeddedCluster {
+ def apply(): EmbeddedCluster = {
+ new EmbeddedCluster(ClusterConfig.master())
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
new file mode 100644
index 0000000..db2cd8a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object Local extends AkkaApp with ArgumentsParser {
+ override def akkaConfig: Config = ClusterConfig.master()
+
+ var LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] =
+ Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
+ "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
+ defaultValue = Some(2)))
+
+ override val description = "Start a local cluster"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ this.LOG = {
+ LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
+ LogUtil.getLogger(getClass)
+ }
+
+ val config = parse(args)
+ if (null != config) {
+ local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
+ }
+ }
+
+ def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
+ if (sameProcess) {
+ LOG.info("Starting local in same process")
+ System.setProperty("LOCAL", "true")
+ }
+ val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+ .asScala.flatMap(Util.parseHostList)
+ val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
+
+ if (masters.size != 1 && masters.head.host != local) {
+ LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
+ s"with ${Constants.GEARPUMP_HOSTNAME}")
+ } else {
+
+ val hostPort = masters.head
+ implicit val system = ActorSystem(MASTER, akkaConf.
+ withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
+ )
+
+ val master = system.actorOf(Props[MasterActor], MASTER)
+ val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
+
+ 0.until(workerCount).foreach { id =>
+ system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+ }
+
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
new file mode 100644
index 0000000..f758720
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.cluster.ClusterEvent._
+import akka.cluster.{MemberStatus, Member, Cluster}
+import akka.cluster.ddata.DistributedData
+import akka.cluster.singleton.{ClusterSingletonProxySettings, ClusterSingletonProxy, ClusterSingletonManagerSettings, ClusterSingletonManager}
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.Master.MasterListUpdated
+import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+object Master extends AkkaApp with ArgumentsParser {
+
+ private var LOG: Logger = LogUtil.getLogger(getClass)
+
+ override def akkaConfig: Config = ClusterConfig.master()
+
+ override val options: Array[(String, CLIOption[Any])] =
+ Array("ip" -> CLIOption[String]("<master ip address>", required = true),
+ "port" -> CLIOption("<master port>", required = true))
+
+ override val description = "Start Master daemon"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ this.LOG = {
+ LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
+ LogUtil.getLogger(getClass)
+ }
+
+ val config = parse(args)
+ master(config.getString("ip"), config.getInt("port"), akkaConf)
+ }
+
+ private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
+ masters.exists { hostPort =>
+ hostPort == s"$master:$port"
+ }
+ }
+
+ private def master(ip: String, port: Int, akkaConf: Config): Unit = {
+ val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+
+ if (!verifyMaster(ip, port, masters)) {
+ LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
+ s"gearpump.cluster.masters: ${masters.mkString(", ")}")
+ System.exit(-1)
+ }
+
+ val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
+ val quorum = masterList.size() / 2 + 1
+ val masterConfig = akkaConf.
+ withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+ withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
+ withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
+ withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
+ ConfigValueFactory.fromAnyRef(quorum))
+
+ LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
+ val system = ActorSystem(MASTER, masterConfig)
+
+ val replicator = DistributedData(system).replicator
+ LOG.info(s"Replicator path: ${replicator.path}")
+
+ // Starts singleton manager
+ val singletonManager = system.actorOf(ClusterSingletonManager.props(
+ singletonProps = Props(classOf[MasterWatcher], MASTER),
+ terminationMessage = PoisonPill,
+ settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
+ .withRole(MASTER)),
+ name = SINGLETON_MANAGER)
+
+ // Start master proxy
+ val masterProxy = system.actorOf(ClusterSingletonProxy.props(
+ singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
+ // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
+ // Master is created when there is a majority of machines started.
+ settings = ClusterSingletonProxySettings(system)
+ .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+ name = MASTER
+ )
+
+ LOG.info(s"master proxy is started at ${masterProxy.path}")
+
+ val mainThread = Thread.currentThread()
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run(): Unit = {
+ if (!system.whenTerminated.isCompleted) {
+ LOG.info("Triggering shutdown hook....")
+
+ system.stop(masterProxy)
+ val cluster = Cluster(system)
+ cluster.leave(cluster.selfAddress)
+ cluster.down(cluster.selfAddress)
+ try {
+ Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+ } catch {
+ case ex: Exception => // Ignore
+ }
+ system.terminate()
+ mainThread.join()
+ }
+ }
+ })
+
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
+
+class MasterWatcher(role: String) extends Actor with ActorLogging {
+ import context.dispatcher
+
+ val cluster = Cluster(context.system)
+
+ val config = context.system.settings.config
+ val masters = config.getList("akka.cluster.seed-nodes")
+ val quorum = masters.size() / 2 + 1
+
+ val system = context.system
+
+ // Sorts by age, oldest first
+ val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
+ var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
+
+ def receive: Receive = null
+
+ // Subscribes to MemberEvent, re-subscribe when restart
+ override def preStart(): Unit = {
+ cluster.subscribe(self, classOf[MemberEvent])
+ context.become(waitForInit)
+ }
+ override def postStop(): Unit = {
+ cluster.unsubscribe(self)
+ }
+
+ def matchingRole(member: Member): Boolean = member.hasRole(role)
+
+ def waitForInit: Receive = {
+ case state: CurrentClusterState => {
+ membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
+ m.status == MemberStatus.Up && matchingRole(m))
+
+ if (membersByAge.size < quorum) {
+ membersByAge.iterator.mkString(",")
+ log.info(s"We cannot get a quorum, $quorum, " +
+ s"shutting down...${membersByAge.iterator.mkString(",")}")
+ context.become(waitForShutdown)
+ self ! MasterWatcher.Shutdown
+ } else {
+ val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
+ notifyMasterMembersChange(master)
+ context.become(waitForClusterEvent(master))
+ }
+ }
+ }
+
+ def waitForClusterEvent(master: ActorRef): Receive = {
+ case MemberUp(m) if matchingRole(m) => {
+ membersByAge += m
+ notifyMasterMembersChange(master)
+ }
+ case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
+ mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
+ log.info(s"member removed ${mEvent.member}")
+ val m = mEvent.member
+ membersByAge -= m
+ if (membersByAge.size < quorum) {
+ log.info(s"We cannot get a quorum, $quorum, " +
+ s"shutting down...${membersByAge.iterator.mkString(",")}")
+ context.become(waitForShutdown)
+ self ! MasterWatcher.Shutdown
+ } else {
+ notifyMasterMembersChange(master)
+ }
+ }
+ }
+
+ private def notifyMasterMembersChange(master: ActorRef): Unit = {
+ val masters = membersByAge.toList.map{ member =>
+ MasterNode(member.address.host.getOrElse("Unknown-Host"),
+ member.address.port.getOrElse(0))
+ }
+ master ! MasterListUpdated(masters)
+ }
+
+ def waitForShutdown: Receive = {
+ case MasterWatcher.Shutdown => {
+ cluster.unsubscribe(self)
+ cluster.leave(cluster.selfAddress)
+ context.stop(self)
+ system.scheduler.scheduleOnce(Duration.Zero) {
+ try {
+ Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+ } catch {
+ case ex: Exception => // Ignore
+ }
+ system.terminate()
+ }
+ }
+ }
+}
+
+object MasterWatcher {
+ object Shutdown
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
new file mode 100644
index 0000000..3d8d823
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+/** Tool to start a worker daemon process */
+object Worker extends AkkaApp with ArgumentsParser {
+ protected override def akkaConfig = ClusterConfig.worker()
+
+ override val description = "Start a worker daemon"
+
+ var LOG: Logger = LogUtil.getLogger(getClass)
+
+ private def uuid = java.util.UUID.randomUUID.toString
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val id = uuid
+
+ this.LOG = {
+ LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
+ // Delay creation of LOG instance to avoid creating an empty log file as we
+ // reset the log file name here
+ LogUtil.getLogger(getClass)
+ }
+
+ val system = ActorSystem(id, akkaConf)
+
+ val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
+ val hostAndPort = address.split(":")
+ HostPort(hostAndPort(0), hostAndPort(1).toInt)
+ }
+
+ LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
+ val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
+
+ system.actorOf(Props(classOf[WorkerActor], masterProxy),
+ classOf[WorkerActor].getSimpleName + id)
+
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
new file mode 100644
index 0000000..0ae7365
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import akka.actor._
+import akka.pattern.ask
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
+import org.apache.gearpump.cluster.master.Master._
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+/**
+ * AppManager is dedicated child of Master to manager all applications.
+ */
+private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
+ extends Actor with Stash with TimeOutScheduler {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
+ private val appMasterMaxRetries: Int = 5
+ private val appMasterRetryTimeRange: Duration = 20.seconds
+
+ implicit val timeout = FUTURE_TIMEOUT
+ implicit val executionContext = context.dispatcher
+
+ // Next available appId
+ private var nextAppId: Int = 1
+
+ // From appId to appMaster data
+ // Applications not in activeAppMasters or deadAppMasters are in pending status
+ private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
+
+ // Active appMaster list where applications are in active status
+ private var activeAppMasters = Set.empty[Int]
+
+ // Dead appMaster list where applications are in inactive status
+ private var deadAppMasters = Set.empty[Int]
+
+ private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
+
+ def receive: Receive = null
+
+ kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
+ context.become(waitForMasterState)
+
+ def waitForMasterState: Receive = {
+ case GetKVSuccess(_, result) =>
+ val masterState = result.asInstanceOf[MasterState]
+ if (masterState != null) {
+ this.nextAppId = masterState.maxId + 1
+ this.activeAppMasters = masterState.activeAppMasters
+ this.deadAppMasters = masterState.deadAppMasters
+ this.appMasterRegistry = masterState.appMasterRegistry
+ }
+ context.become(receiveHandler)
+ unstashAll()
+ case GetKVFailed(ex) =>
+ LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
+ context.parent ! PoisonPill
+ case msg =>
+ LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+ stash()
+ }
+
+ def receiveHandler: Receive = {
+ val msg = "Application Manager started. Ready for application submission..."
+ LOG.info(msg)
+ clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
+ appDataStoreService orElse terminationWatch
+ }
+
+ def clientMsgHandler: Receive = {
+ case SubmitApplication(app, jar, username) =>
+ LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
+ val client = sender()
+ if (applicationNameExist(app.name)) {
+ client ! SubmitApplicationResult(Failure(
+ new Exception(s"Application name ${app.name} already existed")))
+ } else {
+ context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent,
+ Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
+
+ val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null)
+ appMasterRestartPolicies += nextAppId ->
+ new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
+ kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
+ nextAppId += 1
+ }
+
+ case RestartApplication(appId) =>
+ val client = sender()
+ (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+ case GetKVSuccess(_, result) =>
+ val appState = result.asInstanceOf[ApplicationState]
+ if (appState != null) {
+ LOG.info(s"Shutting down the application (restart), $appId")
+ self ! ShutdownApplication(appId)
+ self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
+ } else {
+ client ! SubmitApplicationResult(Failure(
+ new Exception(s"Failed to restart, because the application $appId does not exist.")
+ ))
+ }
+ case GetKVFailed(ex) =>
+ client ! SubmitApplicationResult(Failure(
+ new Exception(s"Unable to obtain the Master State. " +
+ s"Application $appId will not be restarted.")
+ ))
+ }
+
+ case ShutdownApplication(appId) =>
+ LOG.info(s"App Manager Shutting down application $appId")
+ val (_, appInfo) = appMasterRegistry.get(appId)
+ .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
+ .getOrElse((null, null))
+ Option(appInfo) match {
+ case Some(info) =>
+ val worker = info.worker
+ val workerPath = Option(worker).map(_.path).orNull
+ LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID")
+ cleanApplicationData(appId)
+ val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
+ s"AppMaster $appId shutdown requested by master...")
+ sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
+ sender ! ShutdownApplicationResult(Success(appId))
+ case None =>
+ val errorMsg = s"Failed to find registration information for appId: $appId"
+ LOG.error(errorMsg)
+ sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
+ }
+
+ case ResolveAppId(appId) =>
+ val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
+ if (null != appMaster) {
+ sender ! ResolveAppIdResult(Success(appMaster))
+ } else {
+ sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
+ }
+
+ case AppMastersDataRequest =>
+ var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
+ appMasterRegistry.foreach(pair => {
+ val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
+ val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+ val workerPath = Option(info.worker).map(worker =>
+ ActorUtil.getFullPath(context.system, worker.path))
+ val status = getAppMasterStatus(id)
+ appMastersData += AppMasterData(
+ status, id, info.appName, appMasterPath, workerPath.orNull,
+ info.submissionTime, info.startTime, info.finishTime, info.user)
+ })
+
+ sender ! AppMastersData(appMastersData.toList)
+
+ case QueryAppMasterConfig(appId) =>
+ val config =
+ if (appMasterRegistry.contains(appId)) {
+ val (_, info) = appMasterRegistry(appId)
+ info.config
+ } else {
+ null
+ }
+ sender ! AppMasterConfig(config)
+
+ case appMasterDataRequest: AppMasterDataRequest =>
+ val appId = appMasterDataRequest.appId
+ val appStatus = getAppMasterStatus(appId)
+
+ appStatus match {
+ case AppMasterNonExist =>
+ sender ! AppMasterData(AppMasterNonExist)
+ case _ =>
+ val (appMaster, info) = appMasterRegistry(appId)
+ val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+ val workerPath = Option(info.worker).map(
+ worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
+ sender ! AppMasterData(
+ appStatus, appId, info.appName, appMasterPath, workerPath,
+ info.submissionTime, info.startTime, info.finishTime, info.user)
+ }
+ }
+
+ def workerMessage: Receive = {
+ case ShutdownExecutorSucceed(appId, executorId) =>
+ LOG.info(s"Shut down executor $executorId for application $appId successfully")
+ case failed: ShutdownExecutorFailed =>
+ LOG.error(failed.reason)
+ }
+
+ private def getAppMasterStatus(appId: Int): AppMasterStatus = {
+ if (activeAppMasters.contains(appId)) {
+ AppMasterActive
+ } else if (deadAppMasters.contains(appId)) {
+ AppMasterInActive
+ } else if (appMasterRegistry.contains(appId)) {
+ AppMasterPending
+ } else {
+ AppMasterNonExist
+ }
+ }
+
+ private def shutDownExecutorTimeOut(): Unit = {
+ LOG.error(s"Shut down executor time out")
+ }
+
+ def appMasterMessage: Receive = {
+ case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
+ val startTime = System.currentTimeMillis()
+ val register = registerBack.copy(startTime = startTime)
+
+ LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
+ context.watch(appMaster)
+ appMasterRegistry += register.appId -> (appMaster, register)
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+ MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+ sender ! AppMasterRegistered(register.appId)
+
+ case ActivateAppMaster(appId) =>
+ LOG.info(s"Activate AppMaster for app $appId")
+ activeAppMasters += appId
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+ MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+ sender ! AppMasterActivated(appId)
+ }
+
+ def appDataStoreService: Receive = {
+ case SaveAppData(appId, key, value) =>
+ val client = sender()
+ (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
+ case PutKVSuccess =>
+ client ! AppDataSaved
+ case PutKVFailed(k, ex) =>
+ client ! SaveAppDataFailed
+ }
+ case GetAppData(appId, key) =>
+ val client = sender()
+ (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
+ case GetKVSuccess(privateKey, value) =>
+ client ! GetAppDataResult(key, value)
+ case GetKVFailed(ex) =>
+ client ! GetAppDataResult(key, null)
+ }
+ }
+
+ def terminationWatch: Receive = {
+ case terminate: Terminated =>
+ LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
+ s"network down: ${terminate.getAddressTerminated}")
+
+ // Now we assume that the only normal way to stop the application is submitting a
+ // ShutdownApplication request
+ val application = appMasterRegistry.find { appInfo =>
+ val (_, (actorRef, _)) = appInfo
+ actorRef.compareTo(terminate.actor) == 0
+ }
+ if (application.nonEmpty) {
+ val appId = application.get._1
+ (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+ case GetKVSuccess(_, result) =>
+ val appState = result.asInstanceOf[ApplicationState]
+ if (appState != null) {
+ LOG.info(s"Recovering application, $appId")
+ self ! RecoverApplication(appState)
+ } else {
+ LOG.error(s"Cannot find application state for $appId")
+ }
+ case GetKVFailed(ex) =>
+ LOG.error(s"Cannot find master state to recover")
+ }
+ }
+ }
+
+ def selfMsgHandler: Receive = {
+ case RecoverApplication(state) =>
+ val appId = state.appId
+ if (appMasterRestartPolicies.get(appId).get.allowRestart) {
+ LOG.info(s"AppManager Recovering Application $appId...")
+ activeAppMasters -= appId
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+ MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+ context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username,
+ context.parent, None), s"launcher${appId}_${Util.randInt()}")
+ } else {
+ LOG.error(s"Application $appId failed too many times")
+ }
+ }
+
+ case class RecoverApplication(applicationStatus: ApplicationState)
+
+ private def cleanApplicationData(appId: Int): Unit = {
+ if (appMasterRegistry.contains(appId)) {
+ // Add the dead app to dead appMasters
+ deadAppMasters += appId
+ // Remove the dead app from active appMasters
+ activeAppMasters -= appId
+
+ appMasterRegistry += appId -> {
+ val (ref, info) = appMasterRegistry(appId)
+ (ref, info.copy(finishTime = System.currentTimeMillis()))
+ }
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+ MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+ kvService ! DeleteKVGroup(appId.toString)
+ }
+ }
+
+ private def applicationNameExist(appName: String): Boolean = {
+ appMasterRegistry.values.exists { case (_, info) =>
+ info.appName == appName && !deadAppMasters.contains(info.appId)
+ }
+ }
+}
+
+object AppManager {
+ final val APP_STATE = "app_state"
+ // The id is used in KVStore
+ final val MASTER_STATE = "master_state"
+
+ case class MasterState(
+ maxId: Int,
+ appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
+ activeAppMasters: Set[Int],
+ deadAppMasters: Set[Int])
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
new file mode 100644
index 0000000..fd19bad
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ddata.{LWWMap, LWWMapKey, DistributedData}
+import akka.cluster.ddata.Replicator._
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
+
+/**
+ * A replicated simple in-memory KV service. The replications are stored on all masters.
+ */
+class InMemoryKVService extends Actor with Stash {
+ import org.apache.gearpump.cluster.master.InMemoryKVService._
+
+ private val KV_SERVICE = "gearpump_kvservice"
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private val replicator = DistributedData(context.system).replicator
+ private implicit val cluster = Cluster(context.system)
+
+ // Optimize write path, we can tolerate one master down for recovery.
+ private val timeout = Duration(15, TimeUnit.SECONDS)
+ private val readMajority = ReadMajority(timeout)
+ private val writeMajority = WriteMajority(timeout)
+
+ private def groupKey(group: String): LWWMapKey[Any] = {
+ LWWMapKey[Any](KV_SERVICE + "_" + group)
+ }
+
+ def receive: Receive = kvService
+
+ def kvService: Receive = {
+
+ case GetKV(group: String, key: String) =>
+ val request = Request(sender(), key)
+ replicator ! Get(groupKey(group), readMajority, Some(request))
+ case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ val appData = success.get(group)
+ LOG.info(s"Successfully retrived group: ${group.id}")
+ request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
+ case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ LOG.info(s"We cannot find group $group")
+ request.client ! GetKVSuccess(request.key, null)
+ case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ val error = s"Failed to get application data, the request key is ${request.key}"
+ LOG.error(error)
+ request.client ! GetKVFailed(new Exception(error))
+
+ case PutKV(group: String, key: String, value: Any) =>
+ val request = Request(sender(), key)
+ val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
+ map + (key -> value)
+ }
+ replicator ! update
+ case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ request.client ! PutKVSuccess
+ case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
+ request.client ! PutKVFailed(request.key, new Exception(error, cause))
+ case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ request.client ! PutKVFailed(request.key, new TimeoutException())
+
+ case delete@DeleteKVGroup(group: String) =>
+ replicator ! Delete(groupKey(group), writeMajority)
+ case DeleteSuccess(group) =>
+ LOG.info(s"KV Group ${group.id} is deleted")
+ case ReplicationDeleteFailure(group) =>
+ LOG.error(s"Failed to delete KV Group ${group.id}...")
+ case DataDeleted(group) =>
+ LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
+ }
+}
+
+object InMemoryKVService {
+ /**
+ * KV Service related
+ */
+ case class GetKV(group: String, key: String)
+
+ trait GetKVResult
+
+ case class GetKVSuccess(key: String, value: Any) extends GetKVResult
+
+ case class GetKVFailed(ex: Throwable) extends GetKVResult
+
+ case class PutKV(group: String, key: String, value: Any)
+
+ case class DeleteKVGroup(group: String)
+
+ case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
+
+ trait PutKVResult
+
+ case object PutKVSuccess extends PutKVResult
+
+ case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
+
+ case class Request(client: ActorRef, key: String)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
new file mode 100644
index 0000000..6b4df07
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.lang.management.ManagementFactory
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.jarstore.JarStoreServer
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+import akka.actor._
+import akka.remote.DisassociatedEvent
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToAppMaster._
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
+import org.apache.gearpump.cluster.MasterToWorker._
+import org.apache.gearpump.cluster.WorkerToMaster._
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util._
+
+/**
+ * Master Actor who manages resources of the whole cluster.
+ * It is like the resource manager of YARN.
+ */
+private[cluster] class Master extends Actor with Stash {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private val systemConfig: Config = context.system.settings.config
+ private implicit val timeout = Constants.FUTURE_TIMEOUT
+ private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
+ // Resources and resourceRequests can be dynamically constructed by
+ // heartbeat of worker and appmaster when master singleton is migrated.
+ // We don't need to persist them in cluster
+ private var appManager: ActorRef = null
+
+ private var scheduler: ActorRef = null
+
+ private var workers = new immutable.HashMap[ActorRef, WorkerId]
+
+ private val birth = System.currentTimeMillis()
+
+ private var nextWorkerId = 0
+
+ def receive: Receive = null
+
+ // Register jvm metrics
+ Metrics(context.system).register(new JvmMetricsSet(s"master"))
+
+ LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
+
+ val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+
+ private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
+
+ private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
+
+ // Maintain the list of active masters.
+ private var masters: List[MasterNode] = {
+ // Add myself into the list of initial masters.
+ List(MasterNode(hostPort.host, hostPort.port))
+ }
+
+ val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+ val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+ val historyMetricsService = if (metricsEnabled) {
+ val historyMetricsService = {
+ context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
+ }
+
+ val metricsReportService = context.actorOf(
+ Props(new MetricsReporterService(Metrics(context.system))))
+ historyMetricsService.tell(ReportMetrics, metricsReportService)
+ Some(historyMetricsService)
+ } else {
+ None
+ }
+
+ kvService ! GetKV(MASTER_GROUP, WORKER_ID)
+ context.become(waitForNextWorkerId)
+
+ def waitForNextWorkerId: Receive = {
+ case GetKVSuccess(_, result) =>
+ if (result != null) {
+ this.nextWorkerId = result.asInstanceOf[Int]
+ } else {
+ LOG.warn("Cannot find existing state in the distributed cluster...")
+ }
+ context.become(receiveHandler)
+ unstashAll()
+ case GetKVFailed(ex) =>
+ LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
+ context.parent ! PoisonPill
+ case msg =>
+ LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+ stash()
+ }
+
+ def receiveHandler: Receive = workerMsgHandler orElse
+ appMasterMsgHandler orElse
+ onMasterListChange orElse
+ clientMsgHandler orElse
+ metricsService orElse
+ jarStoreService orElse
+ terminationWatch orElse
+ disassociated orElse
+ kvServiceMsgHandler orElse
+ ActorUtil.defaultMsgHandler(self)
+
+ def workerMsgHandler: Receive = {
+ case RegisterNewWorker =>
+ val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
+ nextWorkerId += 1
+ kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
+ val workerHostname = ActorUtil.getHostname(sender())
+ LOG.info(s"Register new from $workerHostname ....")
+ self forward RegisterWorker(workerId)
+
+ case RegisterWorker(id) =>
+ context.watch(sender())
+ sender ! WorkerRegistered(id, MasterInfo(self, birth))
+ scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
+ workers += (sender() -> id)
+ val workerHostname = ActorUtil.getHostname(sender())
+ LOG.info(s"Register Worker with id $id from $workerHostname ....")
+ case resourceUpdate: ResourceUpdate =>
+ scheduler forward resourceUpdate
+ }
+
+ def jarStoreService: Receive = {
+ case GetJarStoreServer =>
+ jarStore forward GetJarStoreServer
+ }
+
+ def kvServiceMsgHandler: Receive = {
+ case PutKVSuccess =>
+ // Skip
+ case PutKVFailed(key, exception) =>
+ LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
+ ExceptionUtils.getStackTrace(exception))
+ }
+
+ def metricsService: Receive = {
+ case query: QueryHistoryMetrics =>
+ if (historyMetricsService.isEmpty) {
+ // Returns empty metrics so that we don't hang the UI
+ sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+ } else {
+ historyMetricsService.get forward query
+ }
+ }
+
+ def appMasterMsgHandler: Receive = {
+ case request: RequestResource =>
+ scheduler forward request
+ case registerAppMaster: RegisterAppMaster =>
+ appManager forward registerAppMaster
+ case activateAppMaster: ActivateAppMaster =>
+ appManager forward activateAppMaster
+ case save: SaveAppData =>
+ appManager forward save
+ case get: GetAppData =>
+ appManager forward get
+ case GetAllWorkers =>
+ sender ! WorkerList(workers.values.toList)
+ case GetMasterData =>
+ val aliveFor = System.currentTimeMillis() - birth
+ val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+ val userDir = System.getProperty("user.dir")
+
+ val masterDescription =
+ MasterSummary(
+ MasterNode(hostPort.host, hostPort.port),
+ masters,
+ aliveFor,
+ logFileDir,
+ jarStoreRootPath,
+ MasterStatus.Synced,
+ userDir,
+ List.empty[MasterActivity],
+ jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+ historyMetricsConfig = getHistoryMetricsConfig
+ )
+
+ sender ! MasterData(masterDescription)
+
+ case invalidAppMaster: InvalidAppMaster =>
+ appManager forward invalidAppMaster
+ }
+
+ import scala.util.{Failure, Success}
+
+ def onMasterListChange: Receive = {
+ case MasterListUpdated(masters: List[MasterNode]) =>
+ this.masters = masters
+ }
+
+ def clientMsgHandler: Receive = {
+ case app: SubmitApplication =>
+ LOG.debug(s"Receive from client, SubmitApplication $app")
+ appManager.forward(app)
+ case app: RestartApplication =>
+ LOG.debug(s"Receive from client, RestartApplication $app")
+ appManager.forward(app)
+ case app: ShutdownApplication =>
+ LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
+ scheduler ! ApplicationFinished(app.appId)
+ appManager.forward(app)
+ case app: ResolveAppId =>
+ LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
+ appManager.forward(app)
+ case resolve: ResolveWorkerId =>
+ LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
+ val worker = workers.find(_._2 == resolve.workerId)
+ worker match {
+ case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
+ case None => sender ! ResolveWorkerIdResult(Failure(
+ new Exception(s"cannot find worker ${resolve.workerId}")))
+ }
+ case AppMastersDataRequest =>
+ LOG.debug("Master received AppMastersDataRequest")
+ appManager forward AppMastersDataRequest
+ case appMasterDataRequest: AppMasterDataRequest =>
+ LOG.debug("Master received AppMasterDataRequest")
+ appManager forward appMasterDataRequest
+ case query: QueryAppMasterConfig =>
+ LOG.debug("Master received QueryAppMasterConfig")
+ appManager forward query
+ case QueryMasterConfig =>
+ sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+ }
+
+ def disassociated: Receive = {
+ case disassociated: DisassociatedEvent =>
+ LOG.info(s" disassociated ${disassociated.remoteAddress}")
+ }
+
+ def terminationWatch: Receive = {
+ case t: Terminated =>
+ val actor = t.actor
+ LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
+ t.getAddressTerminated())
+
+ LOG.info("Let's filter out dead resources...")
+ // Filters out dead worker resource
+ if (workers.keySet.contains(actor)) {
+ scheduler ! WorkerTerminated(workers.get(actor).get)
+ workers -= actor
+ }
+ }
+
+ override def preStart(): Unit = {
+ val path = ActorUtil.getFullPath(context.system, self.path)
+ LOG.info(s"master path is $path")
+ val schedulerClass = Class.forName(
+ systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
+
+ appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
+ classOf[AppManager].getSimpleName)
+ scheduler = context.actorOf(Props(schedulerClass))
+ context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
+ }
+}
+
+object Master {
+ final val MASTER_GROUP = "master_group"
+
+ final val WORKER_ID = "next_worker_id"
+
+ case class WorkerTerminated(workerId: WorkerId)
+
+ case class MasterInfo(master: ActorRef, startTime: Long = 0L)
+
+ /** Notify the subscriber that master actor list has been updated */
+ case class MasterListUpdated(masters: List[MasterNode])
+
+ object MasterInfo {
+ def empty: MasterInfo = MasterInfo(null)
+ }
+
+ case class SlotStatus(totalSlots: Int, availableSlots: Int)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
new file mode 100644
index 0000000..623e3ff
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.scheduler.Relaxation._
+import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.mutable
+
+/** Assign resource to application based on the priority of the application */
+class PriorityScheduler extends Scheduler {
+ private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
+
+ def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
+ override def compare(x: PendingRequest, y: PendingRequest): Int = {
+ var res = x.request.priority.id - y.request.priority.id
+ if (res == 0) {
+ res = y.timeStamp.compareTo(x.timeStamp)
+ }
+ res
+ }
+ }
+
+ override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
+
+ override def allocateResource(): Unit = {
+ var scheduleLater = Array.empty[PendingRequest]
+ val resourcesSnapShot = resources.clone()
+ var allocated = Resource.empty
+ val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
+
+ while (resourceRequests.nonEmpty && (allocated < totalResource)) {
+ val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
+ request.relaxation match {
+ case ANY =>
+ val allocations = allocateFairly(resourcesSnapShot, request)
+ val newAllocated = Resource(allocations.map(_.resource.slots).sum)
+ if (allocations.nonEmpty) {
+ appMaster ! ResourceAllocated(allocations.toArray)
+ }
+ if (newAllocated < request.resource) {
+ val remainingRequest = request.resource - newAllocated
+ val remainingExecutors = request.executorNum - allocations.length
+ val newResourceRequest = request.copy(resource = remainingRequest,
+ executorNum = remainingExecutors)
+ scheduleLater = scheduleLater :+
+ PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+ }
+ allocated = allocated + newAllocated
+ case ONEWORKER =>
+ val availableResource = resourcesSnapShot.find { params =>
+ val (_, (_, resource)) = params
+ resource > request.resource
+ }
+ if (availableResource.nonEmpty) {
+ val (workerId, (worker, resource)) = availableResource.get
+ allocated = allocated + request.resource
+ appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+ workerId)))
+ resourcesSnapShot.update(workerId, (worker, resource - request.resource))
+ } else {
+ scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+ }
+ case SPECIFICWORKER =>
+ val workerAndResource = resourcesSnapShot.get(request.workerId)
+ if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
+ val (worker, availableResource) = workerAndResource.get
+ appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+ request.workerId)))
+ allocated = allocated + request.resource
+ resourcesSnapShot.update(request.workerId, (worker,
+ availableResource - request.resource))
+ } else {
+ scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+ }
+ }
+ }
+ for (request <- scheduleLater)
+ resourceRequests.enqueue(request)
+ }
+
+ def resourceRequestHandler: Receive = {
+ case RequestResource(appId, request) =>
+ LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
+ s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
+ val appMaster = sender()
+ resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
+ System.currentTimeMillis()))
+ allocateResource()
+ }
+
+ override def doneApplication(appId: Int): Unit = {
+ resourceRequests = resourceRequests.filter(_.appId != appId)
+ }
+
+ private def allocateFairly(
+ resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
+ : List[ResourceAllocation] = {
+ val workerNum = resources.size
+ var allocations = List.empty[ResourceAllocation]
+ var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
+ var remainingRequest = request.resource
+ var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
+
+ while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
+ val exeutorNum = Math.min(workerNum, remainingExecutors)
+ val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
+
+ val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
+ val pickedResources = sortedResources.take(exeutorNum)
+
+ val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
+ val ((workerId, (worker, resource)), index) = workerWithIndex
+ 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
+ }.sortBy(_._2).map(_._1)
+
+ if (flattenResource.length < toRequest.slots) {
+ // Can not safisfy the user's requirements
+ totalAvailable = Resource.empty
+ } else {
+ flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
+ toArray.foreach { params =>
+ val ((workerId, worker), slots) = params
+ resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
+ allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
+ }
+ totalAvailable -= toRequest
+ remainingRequest -= toRequest
+ remainingExecutors -= exeutorNum
+ }
+ }
+ allocations
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
new file mode 100644
index 0000000..ec9f1ba
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{Actor, ActorRef}
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.WorkerTerminated
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.collection.mutable
+
+/**
+ * Scheduler schedule resource for different applications.
+ */
+abstract class Scheduler extends Actor {
+ val LOG: Logger = LogUtil.getLogger(getClass)
+ protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
+
+ def handleScheduleMessage: Receive = {
+ case WorkerRegistered(id, _) =>
+ if (!resources.contains(id)) {
+ LOG.info(s"Worker $id added to the scheduler")
+ resources.put(id, (sender, Resource.empty))
+ }
+ case update@ResourceUpdate(worker, workerId, resource) =>
+ LOG.info(s"$update...")
+ if (resources.contains(workerId)) {
+ val resourceReturned = resource > resources.get(workerId).get._2
+ resources.update(workerId, (worker, resource))
+ if (resourceReturned) {
+ allocateResource()
+ }
+ sender ! UpdateResourceSucceed
+ }
+ else {
+ sender ! UpdateResourceFailed(
+ s"ResourceUpdate failed! The worker $workerId has not been registered into master")
+ }
+ case WorkerTerminated(workerId) =>
+ if (resources.contains(workerId)) {
+ resources -= workerId
+ }
+ case ApplicationFinished(appId) =>
+ doneApplication(appId)
+ }
+
+ def allocateResource(): Unit
+
+ def doneApplication(appId: Int): Unit
+}
+
+object Scheduler {
+ case class PendingRequest(
+ appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+
+ case class ApplicationFinished(appId: Int)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
new file mode 100644
index 0000000..3d5b0af
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
+import org.slf4j.Logger
+
+/** Launcher to start an executor process */
+class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override def createProcess(
+ appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+ classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
+
+ LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}")
+ Util.startProcess(options, classPath, mainClass, arguments)
+ }
+
+ override def cleanProcess(appId: Int, executorId: Int): Unit = {}
+}