You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:55 UTC
[45/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
index 01d3474..00bd408 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
@@ -18,8 +18,13 @@
package io.gearpump.cluster.appmaster
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor._
import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
import io.gearpump.TestProbeUtil._
import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
import io.gearpump.cluster._
@@ -27,27 +32,28 @@ import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems
import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit var system: ActorSystem = null
- override def beforeAll() = {
+ override def beforeAll(): Unit = {
system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
}
- override def afterAll() = {
- system.shutdown()
- system.awaitTermination()
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
- "MasterWithExecutorSystemProvider" should "forward request StartExecutorSystem to ExecutorSystemProvider" in {
+ "MasterWithExecutorSystemProvider" should
+ "forward request StartExecutorSystem to ExecutorSystemProvider" in {
val client = TestProbe()
val master = TestProbe()
val provider = TestProbe()
val providerProps: Props = provider
- val masterEnhanced = system.actorOf(Props(new MasterWithExecutorSystemProvider(master.ref, providerProps)))
+ val masterEnhanced = system.actorOf(Props(
+ new MasterWithExecutorSystemProvider(master.ref, providerProps)))
val start = StartExecutorSystems(null, null)
client.send(masterEnhanced, start)
@@ -76,7 +82,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
}
"AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in {
- val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv
+ val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+ setupAppMasterRuntimeEnv()
masterConnectionKeeper.send(runtimeEnv, MasterConnected)
appMaster.expectMsg(StartAppMaster)
@@ -84,7 +91,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
"AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in {
- val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv
+ val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+ setupAppMasterRuntimeEnv()
masterConnectionKeeper.send(runtimeEnv, MasterStopped)
val client = TestProbe()
@@ -94,7 +102,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
"AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in {
- val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv
+ val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+ setupAppMasterRuntimeEnv()
val client = TestProbe()
client.watch(runtimeEnv)
@@ -102,13 +111,13 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
client.expectTerminated(runtimeEnv)
}
- private def setupAppMasterRuntimeEnv: TestAppMasterEnv = {
- val appContext = AppMasterContext(0, null, null, null, null, null, null)
+ private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = {
+ val appContext = AppMasterContext(0, null, null, null, null, null, null)
val app = AppDescription("app", "AppMasterClass", null, null)
val master = TestProbe()
val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master)
val appMaster = TestProbe()
- val appMasterFactory = (_: AppMasterContext, _: AppDescription)=> toProps(appMaster)
+ val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster)
val masterConnectionKeeper = TestProbe()
val masterConnectionKeeperFactory =
(_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) =>
@@ -125,5 +134,7 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
object AppMasterRuntimeEnvironmentSpec {
- case class TestAppMasterEnv(master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe, appMasterRuntimeEnv: ActorRef)
+ case class TestAppMasterEnv(
+ master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe,
+ appMasterRuntimeEnv: ActorRef)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
index dc8e624..3595960 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
@@ -18,24 +18,25 @@
package io.gearpump.cluster.appmaster
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import akka.actor.{ActorSystem, Props}
import akka.testkit.TestProbe
import com.typesafe.config.ConfigValueFactory
-import io.gearpump.WorkerId
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
import io.gearpump.cluster.TestUtil
import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session
import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
import io.gearpump.util.Constants
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import scala.concurrent.duration._
-class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit var system: ActorSystem = null
val workerId: WorkerId = WorkerId(0, 0L)
val appId = 0
@@ -45,15 +46,15 @@ class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndA
val launchExecutorSystemTimeout = 3000
val activeConfig = TestUtil.DEFAULT_CONFIG.
withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS,
- ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout))
+ ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout))
- override def beforeAll() = {
+ override def beforeAll(): Unit = {
system = ActorSystem("test", activeConfig)
}
- override def afterAll() = {
- system.shutdown()
- system.awaitTermination()
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
it should "report success when worker launch the system successfully" in {
@@ -90,14 +91,15 @@ class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndA
client.expectTerminated(launcher)
}
- it should "report timeout when trying to start a executor system on worker, and worker doesn't response" in {
+ it should "report timeout when trying to start a executor system on worker, " +
+ "and worker doesn't response" in {
val client = TestProbe()
val worker = TestProbe()
val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
client.watch(launcher)
val waitFor = launchExecutorSystemTimeout + 10000
- client.expectMsgType[LaunchExecutorSystemTimeout](waitFor milliseconds)
- client.expectTerminated(launcher, waitFor milliseconds)
+ client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds)
+ client.expectTerminated(launcher, waitFor.milliseconds)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
index 106d389..c2f6ee8 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
@@ -18,20 +18,22 @@
package io.gearpump.cluster.appmaster
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.TestProbe
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.{ExecutorJVMConfig, AppJar, TestUtil}
import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
import io.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher}
import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppJar, TestUtil}
import io.gearpump.jarstore.FilePath
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.concurrent.duration._
class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
val appId = 0
@@ -43,7 +45,7 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig)
implicit var system: ActorSystem = null
- var worker:TestProbe = null
+ var worker: TestProbe = null
var workerInfo: WorkerInfo = null
var masterProxy: TestProbe = null
var launcher: TestProbe = null
@@ -58,20 +60,21 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
client = TestProbe()
val scheduler = system.actorOf(
- Props(new ExecutorSystemScheduler(appId, masterProxy.ref,
- (appId: Int, session:Session) => Props(new MockExecutorSystemLauncher(launcher, session)))))
+ Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => {
+ Props(new MockExecutorSystemLauncher(launcher, session))
+ })))
client.send(scheduler, start)
masterProxy.expectMsg(RequestResource(appId, resourceRequest))
}
override def afterEach(): Unit = {
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = {
- val launcherStarted = launcher.receiveOne(15 seconds)
+ val launcherStarted = launcher.receiveOne(15.seconds)
launcherStarted match {
case start: ExecutorSystemLauncherStarted => Some(start)
@@ -91,16 +94,18 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
val executorSystemProbe = TestProbe()
- val executorSystem = ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo)
+ val executorSystem =
+ ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo)
launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session))
client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar)))
}
it should "report failure when resource cannot be allocated" in {
- client.expectMsg(30 seconds, StartExecutorSystemTimeout)
+ client.expectMsg(30.seconds, StartExecutorSystemTimeout)
}
- it should "schedule new resouce on new worker when target worker reject creating executor system" in {
+ it should "schedule new resouce on new worker " +
+ "when target worker reject creating executor system" in {
masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
@@ -110,7 +115,8 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
masterProxy.expectMsg(RequestResource(appId, resourceRequest))
}
- it should "report failure when resource is allocated, but timeout when starting the executor system" in {
+ it should "report failure when resource is allocated, but timeout " +
+ "when starting the executor system" in {
masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
index 9221ca3..3272b99 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
@@ -18,51 +18,50 @@
package io.gearpump.cluster.appmaster
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.TestProbe
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.MasterConnected
-import io.gearpump.cluster.master.MasterProxy.WatchMaster
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
+import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _}
import io.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv
-import io.gearpump.cluster.master.MasterProxy
import io.gearpump.cluster.master.MasterProxy.WatchMaster
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import scala.concurrent.duration._
-
-class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit var system: ActorSystem = null
val register = RegisterAppMaster(null, null)
val appId = 0
- override def beforeAll: Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ }
- override def afterAll: Unit = {
- system.shutdown()
- system.awaitTermination()
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = {
val statusChangeSubscriber = TestProbe()
val master = TestProbe()
- val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
+ val keeper = system.actorOf(Props(
+ new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
statusChangeSubscriber.watch(keeper)
master.expectMsgType[WatchMaster]
- //master is alive, response to RegisterAppMaster
+ // Master is alive, response to RegisterAppMaster
master.expectMsgType[RegisterAppMaster]
master.reply(AppMasterRegistered(appId))
- //notify listener that master is alive
+ // Notify listener that master is alive
statusChangeSubscriber.expectMsg(MasterConnected)
ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber)
}
@@ -75,23 +74,23 @@ class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndA
import io.gearpump.cluster.master.MasterProxy.MasterRestarted
val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
- //master is restarted
+ // Master is restarted
master.send(keeper, MasterRestarted)
master.expectMsgType[RegisterAppMaster]
master.reply(AppMasterRegistered(appId))
masterChangeListener.expectMsg(MasterConnected)
- //Recovery from Master restart is transparent to listener
+ // Recovery from Master restart is transparent to listener
masterChangeListener.expectNoMsg()
}
it should "notify listener and then shutdown itself when master is dead" in {
val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
- //master is dead
+ // Master is dead
master.send(keeper, MasterStopped)
- //keeper should tell the listener that master is stopped before shutting down itself
+ // Keeper should tell the listener that master is stopped before shutting down itself
masterChangeListener.expectMsg(MasterStopped)
masterChangeListener.expectTerminated(keeper)
}
@@ -100,18 +99,20 @@ class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndA
val statusChangeSubscriber = TestProbe()
val master = TestProbe()
- //keeper will register to master by sending RegisterAppMaster
- val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
+ // MasterConnectionKeeper register itself to master by sending RegisterAppMaster
+ val keeper = system.actorOf(Props(new MasterConnectionKeeper(register,
+ master.ref, statusChangeSubscriber.ref)))
- //master doesn't reply to keeper,
+ // Master doesn't reply to keeper,
statusChangeSubscriber.watch(keeper)
- //timeout, keeper notify listener, and then make suicide
- statusChangeSubscriber.expectMsg(60 seconds, MasterStopped)
- statusChangeSubscriber.expectTerminated(keeper, 60 seconds)
+ // Timeout, keeper notify listener, and then make suicide
+ statusChangeSubscriber.expectMsg(60.seconds, MasterStopped)
+ statusChangeSubscriber.expectTerminated(keeper, 60.seconds)
}
}
object MasterConnectionKeeperSpec {
- case class ConnectionKeeperTestEnv(master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe)
+ case class ConnectionKeeperTestEnv(
+ master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
index f08e368..7544f9a 100644
--- a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
@@ -30,7 +30,7 @@ class ArgumentParserSpec extends FlatSpec with Matchers {
"opt2" -> CLIOption[Any]("", required = true))
}
- val result = parser.parse(Array("-flag" , "-opt1", "1","-opt2", "2", "arg1", "arg2"))
+ val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2"))
assert(result.getBoolean("flag"))
assert(result.getInt("opt1") == 1)
assert(result.getString("opt1") == "1")
@@ -47,7 +47,7 @@ class ArgumentParserSpec extends FlatSpec with Matchers {
"opt1" -> CLIOption[Any]("", required = true))
}
- val result = parser.parse(Array("-opt1", "1","xx.MainClass", "-opt2", "2"))
+ val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2"))
assert(result.getInt("opt1") == 1)
assert(result.remainArgs.length == 3)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
index 8ff2ea1..2a8dba1 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,27 @@
package io.gearpump.cluster.master
+import scala.util.Success
+
import akka.actor._
import akka.testkit.TestProbe
-import io.gearpump.WorkerId
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.AppMasterToWorker.{ShutdownExecutor, LaunchExecutor}
+import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
+import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.scheduler.{ResourceRequest, Resource, ResourceAllocation}
+import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{MasterHarness, TestUtil}
import io.gearpump.util.ActorSystemBooter._
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.util.Success
-class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+class AppMasterLauncherSpec extends FlatSpec with Matchers
+ with BeforeAndAfterEach with MasterHarness {
- override def config = TestUtil.DEFAULT_CONFIG
+ override def config: Config = TestUtil.DEFAULT_CONFIG
val appId = 1
val executorId = 2
@@ -45,7 +48,7 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
var watcher: TestProbe = null
var appMasterLauncher: ActorRef = null
- override def beforeEach() = {
+ override def beforeEach(): Unit = {
startActorSystem()
master = createMockMaster()
client = TestProbe()(getActorSystem)
@@ -55,12 +58,13 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
TestUtil.dummyApp, None, "username", master.ref, Some(client.ref)))
watcher watch appMasterLauncher
master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
- val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
+ val resource = ResourceAllocated(
+ Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
master.reply(resource)
worker.expectMsgType[LaunchExecutor]
}
- override def afterEach() = {
+ override def afterEach(): Unit = {
shutdownActorSystem()
}
@@ -79,7 +83,8 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
worker.reply(ExecutorLaunchRejected(""))
master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
- val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
+ val resource = ResourceAllocated(
+ Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
master.reply(resource)
worker.expectMsgType[LaunchExecutor]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
index c1d6144..99cfc37 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
@@ -18,11 +18,11 @@
package io.gearpump.cluster.master
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
import io.gearpump.cluster.appmaster.ApplicationState
-import io.gearpump.cluster.{AppJar, AppDescription}
-import org.scalatest.{BeforeAndAfterEach, Matchers, FlatSpec}
-class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
"ApplicationState" should "check equal with respect to only appId and attemptId" in {
val stateA = ApplicationState(0, "application0", 0, null, null, null, "A")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
index 690d8ad..b007120 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
@@ -18,8 +18,7 @@
package io.gearpump.cluster.master
-//TODO
class MasterProxySpec {
- // master proxy will retry to find the master
+ // Master proxy retries multiple times to find the master
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
index 972f9d5..e4122da 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
@@ -19,5 +19,4 @@
package io.gearpump.cluster.master
class MasterSpec {
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
index 1602d10..3b3265f 100644
--- a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
+++ b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
@@ -18,13 +18,14 @@
package io.gearpump.metrics
-import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
import org.mockito.Matchers._
import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
-class MetricsSpec extends FlatSpec with Matchers with MockitoSugar{
+import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
+
+class MetricsSpec extends FlatSpec with Matchers with MockitoSugar {
"Counter" should "handle sampleRate == 1" in {
@@ -83,7 +84,6 @@ class MetricsSpec extends FlatSpec with Matchers with MockitoSugar{
verify(mockBaseHistogram, times(1)).update(6L)
}
-
"Meter" should "handle sampleRate == 1" in {
val mockBaseMeter = mock[CodaHaleMeter]
@@ -101,7 +101,7 @@ class MetricsSpec extends FlatSpec with Matchers with MockitoSugar{
val mockBaseMeter = mock[CodaHaleMeter]
- val meter = new Meter("m",mockBaseMeter, 2)
+ val meter = new Meter("m", mockBaseMeter, 2)
meter.mark(1)
meter.mark(3)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
index d5a3c18..9509d94 100644
--- a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
+++ b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
@@ -18,11 +18,12 @@
package io.gearpump.partitioner
-import io.gearpump.Message
import org.scalatest.{FlatSpec, Matchers}
-class PartitionerSpec extends FlatSpec with Matchers {
- val NUM = 10
+import io.gearpump.Message
+
+class PartitionerSpec extends FlatSpec with Matchers {
+ val NUM = 10
"HashPartitioner" should "hash same key to same slots" in {
val partitioner = new HashPartitioner
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
index c64cfa7..c38a555 100644
--- a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
+++ b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,20 @@
package io.gearpump.security
-import akka.actor.ActorSystem
-import io.gearpump.cluster.TestUtil
-import io.gearpump.security.Authenticator.AuthenticationResult
-import org.scalatest.{Matchers, FlatSpec}
-
import scala.concurrent.Await
import scala.concurrent.duration._
-class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers {
+import akka.actor.ActorSystem
+import org.scalatest.{FlatSpec, Matchers}
+
+import io.gearpump.cluster.TestUtil
+
+class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers {
it should "authenticate correctly" in {
val config = TestUtil.UI_CONFIG
implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config)
implicit val ec = system.dispatcher
- val timeout = 30 seconds
+ val timeout = 30.seconds
val authenticator = new ConfigFileBasedAuthenticator(config)
val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout)
@@ -48,7 +48,7 @@ class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers {
assert(failedGuest == Authenticator.UnAuthenticated)
assert(failedAdmin == Authenticator.UnAuthenticated)
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
index 69abb80..4a3963b 100644
--- a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
+++ b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,7 @@
package io.gearpump.security
-import org.scalatest.{Matchers, FlatSpec}
+import org.scalatest.{FlatSpec, Matchers}
class PasswordUtilSpec extends FlatSpec with Matchers {
@@ -28,11 +28,10 @@ class PasswordUtilSpec extends FlatSpec with Matchers {
val digest1 = PasswordUtil.hash(password)
val digest2 = PasswordUtil.hash(password)
- // we will use different salt each time, thus
- // creating different hash.
+ // Uses different salt each time, thus creating different hash.
assert(digest1 != digest2)
- // both are valid hash.
+ // Both are valid hash.
assert(PasswordUtil.verify(password, digest1))
assert(PasswordUtil.verify(password, digest2))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
index c18f4d5..3ed6ffa 100644
--- a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
+++ b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
@@ -18,16 +18,19 @@
package io.gearpump.serializer
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor.{ActorSystem, ExtendedActorSystem}
-import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.serializer.SerializerSpec._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
-import scala.collection.JavaConverters._
+import io.gearpump.cluster.TestUtil
+import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
+import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import io.gearpump.serializer.SerializerSpec._
class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
val config = ConfigFactory.empty.withValue("gearpump.serializers",
@@ -56,8 +59,8 @@ class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
val anotherA = serializer.deserialize(bytes)
assert(anotherA.isInstanceOf[ClassA])
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
index 8582f29..71b4218 100644
--- a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
+++ b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
@@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.transport
import java.io.{DataInput, DataOutput}
import io.gearpump.transport.MockTransportSerializer.NettyMessage
import io.gearpump.transport.netty.ITransportMessageSerializer
-import org.jboss.netty.buffer.ChannelBuffer
-class MockTransportSerializer extends ITransportMessageSerializer{
+class MockTransportSerializer extends ITransportMessageSerializer {
override def getLength(obj: scala.Any): Int = 4
override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = {
@@ -38,6 +38,6 @@ class MockTransportSerializer extends ITransportMessageSerializer{
}
}
-object MockTransportSerializer{
- case class NettyMessage(num : Int)
+object MockTransportSerializer {
+ case class NettyMessage(num: Int)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/transport/NettySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/NettySpec.scala b/core/src/test/scala/io/gearpump/transport/NettySpec.scala
index a81e21d..6caf357 100644
--- a/core/src/test/scala/io/gearpump/transport/NettySpec.scala
+++ b/core/src/test/scala/io/gearpump/transport/NettySpec.scala
@@ -19,18 +19,19 @@
package io.gearpump.transport
import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.TestProbe
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
import io.gearpump.cluster.TestUtil
import io.gearpump.transport.MockTransportSerializer.NettyMessage
import io.gearpump.transport.netty.{Context, TaskMessage}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
import io.gearpump.util.Util
-import scala.concurrent.duration._
-
class NettySpec extends FlatSpec with Matchers with MockitoSugar {
"Netty Transport" should "send and receive message correctly " in {
@@ -39,7 +40,7 @@ class NettySpec extends FlatSpec with Matchers with MockitoSugar {
val context = new Context(system, conf)
val serverActor = TestProbe()(system)
- val port = Util.findFreePort
+ val port = Util.findFreePort()
import system.dispatcher
system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) {
@@ -52,10 +53,10 @@ class NettySpec extends FlatSpec with Matchers with MockitoSugar {
val data = NettyMessage(0)
val msg = new TaskMessage(0, 1, 2, data)
client ! msg
- serverActor.expectMsg(15 seconds, data)
-
- context.close
- system.shutdown()
- system.awaitTermination()
+ serverActor.expectMsg(15.seconds, data)
+
+ context.close()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
index d5068fb..13530ed 100644
--- a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
@@ -18,22 +18,24 @@
package io.gearpump.util
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.TestProbe
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.ActorSystemBooter._
-import io.gearpump.util.ActorSystemBooterSpec._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
+import io.gearpump.cluster.TestUtil
+import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _}
+import io.gearpump.util.ActorSystemBooterSpec._
+
class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
"ActorSystemBooter" should "report its address back" in {
val boot = bootSystem()
boot.prob.expectMsgType[RegisterActorSystem]
- boot.shutdown
+ boot.shutdown()
}
"ActorSystemBooter" should "terminate itself when parent actor dies" in {
@@ -44,9 +46,9 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
boot.prob.reply(BindLifeCycle(dummy))
boot.host.stop(dummy)
- val terminated = retry(5)(boot.bootedSystem.isTerminated)
+ val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted)
assert(terminated)
- boot.shutdown
+ boot.shutdown()
}
"ActorSystemBooter" should "create new actor" in {
@@ -59,10 +61,10 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero"))
boot.prob.expectMsgType[ActorCreated]
- boot.shutdown
+ boot.shutdown()
}
- private def bootSystem() : Boot = {
+ private def bootSystem(): Boot = {
val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG)
val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG)
@@ -75,16 +77,16 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
Boot(system, receiver, bootSystem)
}
- case class Boot(host : ActorSystem, prob : TestProbe, bootedSystem : ActorSystem) {
- def shutdown = {
- host.shutdown()
- bootedSystem.shutdown()
- host.awaitTermination()
- bootedSystem.awaitTermination()
+ case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) {
+ def shutdown(): Unit = {
+ host.terminate()
+ bootedSystem.terminate()
+ Await.result(host.whenTerminated, Duration.Inf)
+ Await.result(bootedSystem.whenTerminated, Duration.Inf)
}
}
- def retry(seconds: Int)(fn: => Boolean) : Boolean = {
+ def retry(seconds: Int)(fn: => Boolean): Boolean = {
val result = fn
if (result) {
result
@@ -97,19 +99,19 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
object ActorSystemBooterSpec {
class Dummy extends Actor {
- def receive : Receive = {
+ def receive: Receive = {
case _ =>
}
}
class AcceptZeroArguments extends Actor {
- def receive : Receive = {
+ def receive: Receive = {
case _ =>
}
}
- class AcceptThreeArguments(a : Int, b : Int, c : Int) extends Actor {
- def receive : Receive = {
+ class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor {
+ def receive: Receive = {
case _ =>
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
index ec72810..6ab5a2f 100644
--- a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,23 @@
package io.gearpump.util
+import org.scalatest.FlatSpec
+
import io.gearpump.transport.HostPort
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FlatSpec}
-class ActorUtilSpec extends FlatSpec {
- "masterActorPath" should "construct the ActorPath from HostPort" in {
- import Constants.MASTER
+class ActorUtilSpec extends FlatSpec {
+ "masterActorPath" should "construct the ActorPath from HostPort" in {
+ import io.gearpump.util.Constants.MASTER
- val host = "127.0.0.1"
- val port = 3000
- val master = HostPort("127.0.0.1", 3000)
- val masterPath = ActorUtil.getMasterActorPath(master)
- assert(masterPath.address.port == Some(port))
- assert(masterPath.address.system == MASTER)
- assert(masterPath.address.host == Some(host))
- assert(masterPath.address.protocol == "akka.tcp")
- assert(masterPath.toStringWithoutAddress == s"/user/$MASTER")
- }
+ val host = "127.0.0.1"
+ val port = 3000
+ val master = HostPort("127.0.0.1", 3000)
+ val masterPath = ActorUtil.getMasterActorPath(master)
+ assert(masterPath.address.port == Some(port))
+ assert(masterPath.address.system == MASTER)
+ assert(masterPath.address.host == Some(host))
+ assert(masterPath.address.protocol == "akka.tcp")
+ assert(masterPath.toStringWithoutAddress == s"/user/$MASTER")
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
index 07e8121..0c798f3 100644
--- a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
@@ -19,16 +19,20 @@
package io.gearpump.util
import java.io.File
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
import akka.actor.ActorSystem
-import io.gearpump.cluster.{ClusterConfigSource, ClusterConfig, UserConfig}
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
-class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
+import io.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig}
+
+class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
"Typesafe Cluster Configs" should "follow the override rules" in {
- val conf = """
+ val conf =
+ """
gearpump {
gear = "gearpump"
}
@@ -40,7 +44,7 @@ class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
conf = "worker"
}
conf = "base"
- """
+ """
val file = File.createTempFile("test", ".conf")
FileUtils.write(file, conf)
@@ -68,8 +72,7 @@ class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
implicit val system = ActorSystem("forSerialization")
-
- val map = Map[String,String]("key1"->"1", "key2"->"value2")
+ val map = Map[String, String]("key1" -> "1", "key2" -> "value2")
val user = new UserConfig(map)
.withLong("key3", 2L)
@@ -86,11 +89,11 @@ class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
val data = new ConfigsSpec.Data(3)
assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get)
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
-object ConfigsSpec{
+object ConfigsSpec {
case class Data(value: Int)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
index 1c954d5..30e42c7 100644
--- a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
@@ -16,12 +16,13 @@
* limitations under the License.
*/
-
package io.gearpump.util
import java.io.File
import java.util
+
import org.scalatest.FlatSpec
+
import io.gearpump.google.common.io.Files
class FileUtilsSpec extends FlatSpec {
@@ -43,7 +44,7 @@ class FileUtilsSpec extends FlatSpec {
val file = File.createTempFile("fileutilspec", ".test")
val bytes = TXT.toCharArray.map(_.toByte)
FileUtils.writeByteArrayToFile(file, bytes)
- util.Arrays.equals(bytes,FileUtils.readFileToByteArray(file))
+ util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file))
file.delete()
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/GraphSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/GraphSpec.scala b/core/src/test/scala/io/gearpump/util/GraphSpec.scala
index 0028446..20eab6d 100644
--- a/core/src/test/scala/io/gearpump/util/GraphSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/GraphSpec.scala
@@ -20,9 +20,9 @@ package io.gearpump.util
import org.scalacheck.Gen
import org.scalatest.prop.PropertyChecks
-import org.scalatest.{PropSpec, Matchers}
+import org.scalatest.{Matchers, PropSpec}
-import io.gearpump.util.Graph.{Path, Node}
+import io.gearpump.util.Graph.{Node, Path}
class GraphSpec extends PropSpec with PropertyChecks with Matchers {
@@ -33,7 +33,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
property("Graph with no edges should be built correctly") {
val vertexSet = Set("A", "B", "C")
- val graph = Graph(vertexSet.toSeq.map(Node):_ *)
+ val graph = Graph(vertexSet.toSeq.map(Node): _*)
graph.vertices.toSet shouldBe vertexSet
}
@@ -107,7 +107,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
val levelMap = graph.vertexHierarchyLevelMap()
- //check whether the rule holds: : if vertex A -> B, then level(A) < level(B)
+ // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B)
levelMap("A") < levelMap("B")
levelMap("A") < levelMap("C")
levelMap("B") < levelMap("C")
@@ -178,8 +178,8 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
assert(graph.hasCycle())
}
- property("topologicalOrderIterator " +
- "and topologicalOrderWithCirclesIterator method should return equal order of graph with no circle") {
+ property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " +
+ "return equal order of graph with no circle") {
val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5)
val topoNoCircles = graph.topologicalOrderIterator
val topoWithCircles = graph.topologicalOrderWithCirclesIterator
@@ -188,9 +188,10 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
}
property("Topological sort of graph with circles should work properly") {
- val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7, 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10)
+ val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7,
+ 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10)
val topoWithCircles = graph.topologicalOrderWithCirclesIterator
- val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6 ,5, 7, 10, 9)
+ val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9)
assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
index 213bd96..ef362ff 100644
--- a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,15 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.util
+import scala.concurrent.duration._
+
import akka.actor._
import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.TestUtil
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import org.slf4j.Logger
-import scala.concurrent.duration._
+
+import io.gearpump.cluster.TestUtil
class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
@@ -40,7 +42,7 @@ class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with I
val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref))
val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor]
testActor.sendMsgToIgnore()
- mockActor.expectMsg(30 seconds, MessageTimeOut)
+ mockActor.expectMsg(30.seconds, MessageTimeOut)
}
}
}
@@ -60,7 +62,7 @@ class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler {
def sendMsgToIgnore(): Unit = {
sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut())
}
-
+
private def sendMsgTimeOut(): Unit = {
mock ! MessageTimeOut
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/UtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/UtilSpec.scala b/core/src/test/scala/io/gearpump/util/UtilSpec.scala
index 8d614fe..b5bde04 100644
--- a/core/src/test/scala/io/gearpump/util/UtilSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/UtilSpec.scala
@@ -18,17 +18,18 @@
package io.gearpump.util
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Util._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
+import io.gearpump.transport.HostPort
+import io.gearpump.util.Util._
+
class UtilSpec extends FlatSpec with Matchers with MockitoSugar {
it should "work" in {
- assert(findFreePort.isSuccess)
+ assert(findFreePort().isSuccess)
- assert(randInt != randInt)
+ assert(randInt() != randInt())
val hosts = parseHostList("host1:1,host2:2")
assert(hosts(1) == HostPort("host2", 2))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
index f0e0c5c..d226af9 100644
--- a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
+++ b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
@@ -1,2 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
io.gearpump.jarstore.local.LocalJarStoreService
io.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
index ac942ed..420f1b6 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +18,34 @@
package io.gearpump.cluster
import akka.actor.ActorRef
-import io.gearpump.WorkerId
+
import io.gearpump.cluster.master.Master.MasterInfo
import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
/**
* Cluster Bootup Flow
*/
object WorkerToMaster {
+
+ /** When an worker is started, it sends RegisterNewWorker */
case object RegisterNewWorker
+
+ /** When worker lose connection with master, it tries to register itself again with old Id. */
case class RegisterWorker(workerId: WorkerId)
+
+ /** Worker is responsible to broadcast its current status to master */
case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
}
object MasterToWorker {
- case class WorkerRegistered(workerId : WorkerId, masterInfo: MasterInfo)
- case class UpdateResourceFailed(reason : String = null, ex: Throwable = null)
+
+ /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
+ case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
+
+ /** Worker have not received reply from master */
+ case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
+
+ /** Master is synced with worker on resource slots managed by current worker */
case object UpdateResourceSucceed
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
index 6be52b2..53da645 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,19 @@
package io.gearpump.cluster.embedded
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{ConfigValueFactory, Config}
+import com.typesafe.config.{Config, ConfigValueFactory}
+
import io.gearpump.cluster.ClusterConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.master.{Master => MasterActor}
import io.gearpump.cluster.worker.{Worker => WorkerActor}
-import io.gearpump.util.{LogUtil, Constants, Util, ActorUtil}
-import io.gearpump.util.Constants.{GEARPUMP_METRIC_ENABLED, GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, MASTER, GEARPUMP_CLUSTER_MASTERS}
-import scala.collection.JavaConverters._
-
+import io.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
+import io.gearpump.util.{LogUtil, Util}
/**
* Create a in-process cluster with single worker
@@ -41,8 +44,8 @@ class EmbeddedCluster(inputConfig: Config) {
private val LOG = LogUtil.getLogger(getClass)
- def start: Unit = {
- val port = Util.findFreePort.get
+ def start(): Unit = {
+ val port = Util.findFreePort().get
val akkaConf = getConfig(inputConfig, port)
_config = akkaConf
val system = ActorSystem(MASTER, akkaConf)
@@ -64,10 +67,13 @@ class EmbeddedCluster(inputConfig: Config) {
private def getConfig(inputConfig: Config, port: Int): Config = {
val config = inputConfig.
withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
- withValue(GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
- withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, ConfigValueFactory.fromAnyRef(true)).
+ withValue(GEARPUMP_CLUSTER_MASTERS,
+ ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+ withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
+ ConfigValueFactory.fromAnyRef(true)).
withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
- withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+ withValue("akka.actor.provider",
+ ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
config
}
@@ -75,14 +81,14 @@ class EmbeddedCluster(inputConfig: Config) {
ClientContext(_config, _system, _master)
}
- def stop: Unit = {
+ def stop(): Unit = {
_system.stop(_master)
- _system.shutdown()
- _system.awaitTermination()
+ _system.terminate()
+ Await.result(_system.whenTerminated, Duration.Inf)
}
}
-object EmbeddedCluster{
+object EmbeddedCluster {
def apply(): EmbeddedCluster = {
new EmbeddedCluster(ClusterConfig.master())
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
index 90f653c..68f778e 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,13 +20,13 @@ package io.gearpump.cluster.main
import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.jar.JarFile
+import scala.util.Try
-import io.gearpump.util.{Util, Constants}
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
import org.slf4j.Logger
-import scala.util.Try
+import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+/** Tool to submit an application jar to cluster */
object AppSubmitter extends AkkaApp with ArgumentsParser {
val LOG: Logger = LogUtil.getLogger(getClass)
@@ -35,60 +35,66 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
override val description = "Submit an application to Master by providing a jar"
override val options: Array[(String, CLIOption[Any])] = Array(
- "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, defaultValue = Some("")),
+ "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
+ defaultValue = Some("")),
"jar" -> CLIOption("<application>.jar", required = true),
- "executors" -> CLIOption[Int]("number of executor to launch", required = false, defaultValue = Some(1)),
- "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
+ "executors" -> CLIOption[Int]("number of executor to launch", required = false,
+ defaultValue = Some(1)),
+ "verbose" -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)),
// For document purpose only, OPTION_CONFIG option is not used here.
// OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
- if (null == config) {
- return
- }
+ if (null != config) {
- val verbose = config.getBoolean("verbose")
- if (verbose) {
- LogUtil.verboseLogToConsole
- }
+ val verbose = config.getBoolean("verbose")
+ if (verbose) {
+ LogUtil.verboseLogToConsole()
+ }
- val jar = config.getString("jar")
+ val jar = config.getString("jar")
- // Set jar path to be submitted to cluster
- System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
- System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
+ // Set jar path to be submitted to cluster
+ System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+ System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
- val namePrefix = config.getString("namePrefix")
- if (namePrefix.nonEmpty) {
- if (!Util.validApplicationName(namePrefix)) {
- throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+ val namePrefix = config.getString("namePrefix")
+ if (namePrefix.nonEmpty) {
+ if (!Util.validApplicationName(namePrefix)) {
+ throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+ }
+ System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
}
- System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
- }
- val jarFile = new java.io.File(jar)
+ val jarFile = new java.io.File(jar)
- //start main class
- if (!jarFile.exists()) {
- throw new Exception(s"jar $jar does not exist")
- }
+ // Start main class
+ if (!jarFile.exists()) {
+ throw new Exception(s"jar $jar does not exist")
+ }
- val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + jarFile.getAbsolutePath)),
- Thread.currentThread().getContextClassLoader())
- val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
+ val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
+ jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader())
+ val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
- //set the context classloader as ActorSystem will use context classloader in precedence.
- Thread.currentThread().setContextClassLoader(classLoader)
- val clazz = classLoader.loadClass(main)
- val mainMethod = clazz.getMethod("main", classOf[Array[String]])
- mainMethod.invoke(null, arguments)
+ // Set to context classloader. ActorSystem pick context classloader in preference
+ Thread.currentThread().setContextClassLoader(classLoader)
+ val clazz = classLoader.loadClass(main)
+ val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+ mainMethod.invoke(null, arguments)
+ }
}
- private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader): (String, Array[String]) = {
- val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.getValue("Main-Class")).getOrElse("")
+ private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
+ : (String, Array[String]) = {
+ val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
+ getValue("Main-Class")).getOrElse("")
+
if (remainArgs.length > 0 && Try(classLoader.loadClass(remainArgs(0))).isSuccess) {
(remainArgs(0), remainArgs.drop(1))
} else if (mainInManifest.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
index 31b7492..4423727 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
*/
package io.gearpump.cluster.main
-import io.gearpump.util.{Constants, LogUtil}
import org.slf4j.Logger
-object Gear {
+import io.gearpump.util.{Constants, LogUtil}
+
+object Gear {
val OPTION_CONFIG = "conf"
@@ -29,12 +30,14 @@ object Gear {
val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
"info" -> Info, "replay" -> Replay, "main" -> MainRunner)
- def usage: Unit = {
+ def usage(): Unit = {
val keys = commands.keys.toList.sorted
+ // scalastyle:off println
Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+ // scalastyle:on println
}
- def executeCommand(command : String, commandArgs : Array[String]) = {
+ private def executeCommand(command: String, commandArgs: Array[String]) = {
commands.get(command).map(_.main(commandArgs))
if (!commands.contains(command)) {
val allArgs = (command +: commandArgs.toList).toArray
@@ -42,15 +45,15 @@ object Gear {
}
}
- def main(inputArgs: Array[String]) = {
+ def main(inputArgs: Array[String]): Unit = {
val (configFile, args) = extractConfig(inputArgs)
if (configFile != null) {
- // set custom config file...
+ // Sets custom config file...
System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
}
if (args.length == 0) {
- usage
+ usage()
} else {
val command = args(0)
val commandArgs = args.drop(1)
@@ -62,7 +65,7 @@ object Gear {
var index = 0
var result = List.empty[String]
- var configFile:String = null
+ var configFile: String = null
while (index < inputArgs.length) {
val item = inputArgs(index)
if (item == s"-$OPTION_CONFIG") {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
index 878bcbf..4922690 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,13 @@
*/
package io.gearpump.cluster.main
+import org.slf4j.Logger
+
import io.gearpump.cluster.MasterToAppMaster.AppMastersData
import io.gearpump.cluster.client.ClientContext
import io.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
-
-import scala.util.Try
+/** Tool to query master info */
object Info extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -31,11 +31,12 @@ object Info extends AkkaApp with ArgumentsParser {
override val options: Array[(String, CLIOption[Any])] = Array(
// For document purpose only, OPTION_CONFIG option is not used here.
// OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
-
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
override val description = "Query the Application list"
+ // scalastyle:off println
def main(akkaConf: Config, args: Array[String]): Unit = {
val client = ClientContext(akkaConf)
@@ -48,4 +49,5 @@ object Info extends AkkaApp with ArgumentsParser {
}
client.close()
}
+ // scalastyle:on println
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
index 195809c..3ce781f 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,12 @@
package io.gearpump.cluster.main
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, LogUtil}
import org.slf4j.Logger
-import scala.util.Try
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.util.{AkkaApp, LogUtil}
+/** Tool to kill an App */
object Kill extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -32,20 +32,19 @@ object Kill extends AkkaApp with ArgumentsParser {
"appid" -> CLIOption("<application id>", required = true),
// For document purpose only, OPTION_CONFIG option is not used here.
// OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
override val description = "Kill an application with application Id"
def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
- if (null == config) {
- return
+ if (null != config) {
+ val client = ClientContext(akkaConf)
+ LOG.info("Client ")
+ client.shutdown(config.getInt("appid"))
+ client.close()
}
-
- val client = ClientContext(akkaConf)
- LOG.info("Client ")
- client.shutdown(config.getInt("appid"))
- client.close()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
index d6b2479..d5681df 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
package io.gearpump.cluster.main
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+
import io.gearpump.cluster.ClusterConfig
import io.gearpump.cluster.master.{Master => MasterActor}
import io.gearpump.cluster.worker.{Worker => WorkerActor}
import io.gearpump.util.Constants._
import io.gearpump.util.LogUtil.ProcessType
-import io.gearpump.util.{AkkaApp, ActorUtil, Constants, LogUtil, Util}
-import org.slf4j.Logger
-
-import scala.collection.JavaConversions._
+import io.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
object Local extends AkkaApp with ArgumentsParser {
override def akkaConfig: Config = ClusterConfig.master()
@@ -37,7 +40,8 @@ object Local extends AkkaApp with ArgumentsParser {
override val options: Array[(String, CLIOption[Any])] =
Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
- "workernum"-> CLIOption[Int]("<how many workers to start>", required = false, defaultValue = Some(2)))
+ "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
+ defaultValue = Some(2)))
override val description = "Start a local cluster"
@@ -49,23 +53,23 @@ object Local extends AkkaApp with ArgumentsParser {
}
val config = parse(args)
- if (null == config) {
- return
+ if (null != config) {
+ local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
}
- local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
}
- def local(workerCount : Int, sameProcess : Boolean, akkaConf: Config) : Unit = {
+ def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
if (sameProcess) {
LOG.info("Starting local in same process")
System.setProperty("LOCAL", "true")
}
- val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList)
+ val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+ .asScala.flatMap(Util.parseHostList)
val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
- if(masters.size != 1 && masters.head.host != local) {
- LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match with ${Constants.GEARPUMP_HOSTNAME}")
-
+ if (masters.size != 1 && masters.head.host != local) {
+ LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
+ s"with ${Constants.GEARPUMP_HOSTNAME}")
} else {
val hostPort = masters.head
@@ -80,7 +84,7 @@ object Local extends AkkaApp with ArgumentsParser {
system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
}
- system.awaitTermination()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
index ea235ed..923a646 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,19 @@
package io.gearpump.cluster.main
-import io.gearpump.util.{LogUtil, AkkaApp}
-import io.gearpump.util.{AkkaApp, LogUtil}
import org.slf4j.Logger
-import scala.util.Try
+import io.gearpump.util.{AkkaApp, LogUtil}
+/** Tool to run any main class by providing a jar */
object MainRunner extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
// For document purpose only, OPTION_CONFIG option is not used here.
// OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
- Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
def main(akkaConf: Config, args: Array[String]): Unit = {
val mainClazz = args(0)