You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:51 UTC
[41/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
deleted file mode 100644
index 00bd408..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.TestProbeUtil._
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
-import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-
-class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- implicit var system: ActorSystem = null
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- "MasterWithExecutorSystemProvider" should
- "forward request StartExecutorSystem to ExecutorSystemProvider" in {
-
- val client = TestProbe()
- val master = TestProbe()
- val provider = TestProbe()
- val providerProps: Props = provider
- val masterEnhanced = system.actorOf(Props(
- new MasterWithExecutorSystemProvider(master.ref, providerProps)))
-
- val start = StartExecutorSystems(null, null)
- client.send(masterEnhanced, start)
- provider.expectMsg(start)
-
- val anyOtherMessage = "any other message"
- client.send(masterEnhanced, anyOtherMessage)
- master.expectMsg(anyOtherMessage)
- system.stop(masterEnhanced)
- }
-
- "LazyStartAppMaster" should "forward command to appmaster when app master started" in {
-
- val appMaster = TestProbe()
- val appMasterProps: Props = appMaster
- val lazyAppMaster = system.actorOf(Props(new LazyStartAppMaster(appId = 0, appMasterProps)))
- val msg = "Some"
- lazyAppMaster ! msg
- lazyAppMaster ! StartAppMaster
- appMaster.expectMsg(msg)
-
- system.stop(appMaster.ref)
- val client = TestProbe()
- client.watch(lazyAppMaster)
- client.expectTerminated(lazyAppMaster)
- }
-
- "AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in {
- val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
- setupAppMasterRuntimeEnv()
-
- masterConnectionKeeper.send(runtimeEnv, MasterConnected)
- appMaster.expectMsg(StartAppMaster)
- }
-
- "AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in {
-
- val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
- setupAppMasterRuntimeEnv()
-
- masterConnectionKeeper.send(runtimeEnv, MasterStopped)
- val client = TestProbe()
- client.watch(runtimeEnv)
- client.expectTerminated(runtimeEnv)
- }
-
- "AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in {
-
- val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
- setupAppMasterRuntimeEnv()
-
- val client = TestProbe()
- client.watch(runtimeEnv)
- system.stop(appMaster.ref)
- client.expectTerminated(runtimeEnv)
- }
-
- private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = {
- val appContext = AppMasterContext(0, null, null, null, null, null, null)
- val app = AppDescription("app", "AppMasterClass", null, null)
- val master = TestProbe()
- val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master)
- val appMaster = TestProbe()
- val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster)
- val masterConnectionKeeper = TestProbe()
- val masterConnectionKeeperFactory =
- (_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) =>
- toProps(masterConnectionKeeper)
-
- val runtimeEnv = system.actorOf(
- Props(new AppMasterRuntimeEnvironment(
- appContext, app, List(master.ref.path), masterFactory,
- appMasterFactory, masterConnectionKeeperFactory)))
-
- TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv)
- }
-}
-
-object AppMasterRuntimeEnvironmentSpec {
-
- case class TestAppMasterEnv(
- master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe,
- appMasterRuntimeEnv: ActorRef)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
deleted file mode 100644
index 3595960..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigValueFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
-import io.gearpump.util.Constants
-
-class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- implicit var system: ActorSystem = null
- val workerId: WorkerId = WorkerId(0, 0L)
- val appId = 0
- val executorId = 0
- val url = "akka.tcp://worker@127.0.0.1:3000"
- val session = Session(null, null)
- val launchExecutorSystemTimeout = 3000
- val activeConfig = TestUtil.DEFAULT_CONFIG.
- withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS,
- ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout))
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", activeConfig)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "report success when worker launch the system successfully" in {
- val worker = TestProbe()
- val client = TestProbe()
-
- val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
- client.watch(launcher)
- client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
-
- worker.expectMsgType[LaunchExecutor]
- worker.reply(RegisterActorSystem(url))
-
- worker.expectMsgType[ActorSystemRegistered]
-
- client.expectMsgType[LaunchExecutorSystemSuccess]
- client.expectTerminated(launcher)
- }
-
- it should "report failure when worker refuse to launch the system explicitly" in {
- val worker = TestProbe()
- val client = TestProbe()
-
- val resource = Resource(4)
-
- val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
- client.watch(launcher)
- client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, resource))
-
- worker.expectMsgType[LaunchExecutor]
- worker.reply(ExecutorLaunchRejected())
-
- client.expectMsg(LaunchExecutorSystemRejected(resource, null, session))
- client.expectTerminated(launcher)
- }
-
- it should "report timeout when trying to start a executor system on worker, " +
- "and worker doesn't response" in {
- val client = TestProbe()
- val worker = TestProbe()
- val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
- client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
- client.watch(launcher)
- val waitFor = launchExecutorSystemTimeout + 10000
- client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds)
- client.expectTerminated(launcher, waitFor.milliseconds)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
deleted file mode 100644
index c2f6ee8..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
-import io.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher}
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppJar, TestUtil}
-import io.gearpump.jarstore.FilePath
-
-class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
- val appId = 0
- val workerId = WorkerId(0, 0L)
- val resource = Resource(1)
- val resourceRequest = ResourceRequest(resource, WorkerId.unspecified)
- val mockJar = AppJar("for_test", FilePath("PATH"))
- val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "")
- val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig)
-
- implicit var system: ActorSystem = null
- var worker: TestProbe = null
- var workerInfo: WorkerInfo = null
- var masterProxy: TestProbe = null
- var launcher: TestProbe = null
- var client: TestProbe = null
-
- override def beforeEach(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- worker = TestProbe()
- workerInfo = WorkerInfo(workerId, worker.ref)
- masterProxy = TestProbe()
- launcher = TestProbe()
- client = TestProbe()
-
- val scheduler = system.actorOf(
- Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => {
- Props(new MockExecutorSystemLauncher(launcher, session))
- })))
-
- client.send(scheduler, start)
- masterProxy.expectMsg(RequestResource(appId, resourceRequest))
- }
-
- override def afterEach(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = {
- val launcherStarted = launcher.receiveOne(15.seconds)
-
- launcherStarted match {
- case start: ExecutorSystemLauncherStarted => Some(start)
- case x =>
- assert(false, "ExecutorSystemLauncherStarted == false")
- None
- }
- }
-
- it should "schedule and launch an executor system on target worker successfully" in {
-
- masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
-
- val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
-
- var systemId = 0
- launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
-
- val executorSystemProbe = TestProbe()
- val executorSystem =
- ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo)
- launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session))
- client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar)))
- }
-
- it should "report failure when resource cannot be allocated" in {
- client.expectMsg(30.seconds, StartExecutorSystemTimeout)
- }
-
- it should "schedule new resouce on new worker " +
- "when target worker reject creating executor system" in {
- masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
- val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
-
- var systemId = 0
- launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
- launcher.reply(LaunchExecutorSystemRejected(resource, "", session))
- masterProxy.expectMsg(RequestResource(appId, resourceRequest))
- }
-
- it should "report failure when resource is allocated, but timeout " +
- "when starting the executor system" in {
- masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
- val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
-
- var systemId = 0
- launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
- launcher.reply(LaunchExecutorSystemTimeout(session))
- client.expectMsg(StartExecutorSystemTimeout)
- }
-}
-
-object ExecutorSystemSchedulerSpec {
- class MockExecutorSystemLauncher(forwardTo: TestProbe, session: Session) extends Actor {
- forwardTo.ref ! ExecutorSystemLauncherStarted(session)
-
- def receive: Receive = {
- case msg => forwardTo.ref forward msg
- }
- }
-
- case class ExecutorSystemLauncherStarted(session: Session)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
deleted file mode 100644
index 3272b99..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _}
-import io.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv
-import io.gearpump.cluster.master.MasterProxy.WatchMaster
-
-class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
- implicit var system: ActorSystem = null
- val register = RegisterAppMaster(null, null)
- val appId = 0
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = {
- val statusChangeSubscriber = TestProbe()
- val master = TestProbe()
-
- val keeper = system.actorOf(Props(
- new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
- statusChangeSubscriber.watch(keeper)
-
- master.expectMsgType[WatchMaster]
-
- // Master is alive, response to RegisterAppMaster
- master.expectMsgType[RegisterAppMaster]
- master.reply(AppMasterRegistered(appId))
-
- // Notify listener that master is alive
- statusChangeSubscriber.expectMsg(MasterConnected)
- ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber)
- }
-
- it should "start correctly and notify listener that master is alive" in {
- startMasterConnectionKeeper
- }
-
- it should "re-register the appmaster when master is restarted" in {
- import io.gearpump.cluster.master.MasterProxy.MasterRestarted
- val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
-
- // Master is restarted
- master.send(keeper, MasterRestarted)
- master.expectMsgType[RegisterAppMaster]
- master.reply(AppMasterRegistered(appId))
- masterChangeListener.expectMsg(MasterConnected)
-
- // Recovery from Master restart is transparent to listener
- masterChangeListener.expectNoMsg()
- }
-
- it should "notify listener and then shutdown itself when master is dead" in {
- val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
-
- // Master is dead
- master.send(keeper, MasterStopped)
-
- // Keeper should tell the listener that master is stopped before shutting down itself
- masterChangeListener.expectMsg(MasterStopped)
- masterChangeListener.expectTerminated(keeper)
- }
-
- it should "mark the master as dead when timeout" in {
- val statusChangeSubscriber = TestProbe()
- val master = TestProbe()
-
- // MasterConnectionKeeper register itself to master by sending RegisterAppMaster
- val keeper = system.actorOf(Props(new MasterConnectionKeeper(register,
- master.ref, statusChangeSubscriber.ref)))
-
- // Master doesn't reply to keeper,
- statusChangeSubscriber.watch(keeper)
-
- // Timeout, keeper notify listener, and then make suicide
- statusChangeSubscriber.expectMsg(60.seconds, MasterStopped)
- statusChangeSubscriber.expectTerminated(keeper, 60.seconds)
- }
-}
-
-object MasterConnectionKeeperSpec {
- case class ConnectionKeeperTestEnv(
- master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe)
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
deleted file mode 100644
index 7544f9a..0000000
--- a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import org.scalatest.{FlatSpec, Matchers}
-
-class ArgumentParserSpec extends FlatSpec with Matchers {
- it should "parse arguments correctly" in {
-
- val parser = new ArgumentsParser {
- override val options = Array(
- "flag" -> CLIOption[Any]("", required = true),
- "opt1" -> CLIOption[Any]("", required = true),
- "opt2" -> CLIOption[Any]("", required = true))
- }
-
- val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2"))
- assert(result.getBoolean("flag"))
- assert(result.getInt("opt1") == 1)
- assert(result.getString("opt1") == "1")
- assert(result.getInt("opt2") == 2)
-
- assert(result.remainArgs(0) == "arg1")
- assert(result.remainArgs(1) == "arg2")
- }
-
- it should "handle interleaved options and remain args" in {
-
- val parser = new ArgumentsParser {
- override val options = Array(
- "opt1" -> CLIOption[Any]("", required = true))
- }
-
- val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2"))
- assert(result.getInt("opt1") == 1)
-
- assert(result.remainArgs.length == 3)
-
- intercept[Exception] {
- parser.parse(Array("-opt2"))
- }
-
- intercept[Exception] {
- parser.parse(Array("-opt2", "2", "-opt1"))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
deleted file mode 100644
index 2a8dba1..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import scala.util.Success
-
-import akka.actor._
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.ActorSystemBooter._
-
-class AppMasterLauncherSpec extends FlatSpec with Matchers
- with BeforeAndAfterEach with MasterHarness {
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- val appId = 1
- val executorId = 2
- var master: TestProbe = null
- var client: TestProbe = null
- var worker: TestProbe = null
- var watcher: TestProbe = null
- var appMasterLauncher: ActorRef = null
-
- override def beforeEach(): Unit = {
- startActorSystem()
- master = createMockMaster()
- client = TestProbe()(getActorSystem)
- worker = TestProbe()(getActorSystem)
- watcher = TestProbe()(getActorSystem)
- appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId,
- TestUtil.dummyApp, None, "username", master.ref, Some(client.ref)))
- watcher watch appMasterLauncher
- master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
- val resource = ResourceAllocated(
- Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
- master.reply(resource)
- worker.expectMsgType[LaunchExecutor]
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- "AppMasterLauncher" should "launch appmaster correctly" in {
- worker.reply(RegisterActorSystem("systempath"))
- worker.expectMsgType[ActorSystemRegistered]
-
- worker.expectMsgType[CreateActor]
- worker.reply(ActorCreated(master.ref, "appmaster"))
-
- client.expectMsg(SubmitApplicationResult(Success(appId)))
- watcher.expectTerminated(appMasterLauncher)
- }
-
- "AppMasterLauncher" should "reallocate resource if executor launch rejected" in {
- worker.reply(ExecutorLaunchRejected(""))
- master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
-
- val resource = ResourceAllocated(
- Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
- master.reply(resource)
- worker.expectMsgType[LaunchExecutor]
-
- worker.reply(RegisterActorSystem("systempath"))
- worker.expectMsgType[ActorSystemRegistered]
-
- worker.expectMsgType[CreateActor]
- worker.reply(CreateActorFailed("", new Exception))
- worker.expectMsgType[ShutdownExecutor]
- assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
- watcher.expectTerminated(appMasterLauncher)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
deleted file mode 100644
index 99cfc37..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.appmaster.ApplicationState
-
-class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- "ApplicationState" should "check equal with respect to only appId and attemptId" in {
- val stateA = ApplicationState(0, "application0", 0, null, null, null, "A")
- val stateB = ApplicationState(0, "application0", 0, null, null, null, "B")
- val stateC = ApplicationState(0, "application1", 1, null, null, null, "A")
-
- assert(stateA == stateB)
- assert(stateA.hashCode == stateB.hashCode)
- assert(stateA != stateC)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
deleted file mode 100644
index b007120..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-class MasterProxySpec {
-
- // Master proxy retries multiple times to find the master
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
deleted file mode 100644
index e4122da..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-class MasterSpec {
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
deleted file mode 100644
index 3b3265f..0000000
--- a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
-
-class MetricsSpec extends FlatSpec with Matchers with MockitoSugar {
-
- "Counter" should "handle sampleRate == 1" in {
-
- val mockBaseCounter = mock[CodaHaleCounter]
-
- val counter = new Counter("c", mockBaseCounter)
-
- counter.inc()
- counter.inc()
-
- verify(mockBaseCounter, times(2)).inc(1)
- }
-
- "Counter" should "handle sampleRate == 3" in {
-
- val mockBaseCounter = mock[CodaHaleCounter]
-
- val counter = new Counter("c", mockBaseCounter, 3)
-
- counter.inc(1)
- counter.inc(1)
- counter.inc(1)
- counter.inc(1)
- counter.inc(1)
- counter.inc(1)
-
- verify(mockBaseCounter, times(2)).inc(3)
- }
-
- "Histogram" should "handle sampleRate == 1" in {
-
- val mockBaseHistogram = mock[CodaHaleHistogram]
-
- val histogram = new Histogram("h", mockBaseHistogram)
-
- histogram.update(3)
- histogram.update(7)
- histogram.update(5)
- histogram.update(9)
-
- verify(mockBaseHistogram, times(4)).update(anyLong())
- }
-
- "Histogram" should "handle sampleRate > 1" in {
-
- val mockBaseHistogram = mock[CodaHaleHistogram]
-
- val histogram = new Histogram("h", mockBaseHistogram, 2)
-
- histogram.update(3)
- histogram.update(4)
- histogram.update(5)
- histogram.update(6)
-
- verify(mockBaseHistogram, times(1)).update(4L)
- verify(mockBaseHistogram, times(1)).update(6L)
- }
-
- "Meter" should "handle sampleRate == 1" in {
-
- val mockBaseMeter = mock[CodaHaleMeter]
-
- val meter = new Meter("m", mockBaseMeter)
-
- meter.mark()
- meter.mark(3)
-
- verify(mockBaseMeter, times(1)).mark(1L)
- verify(mockBaseMeter, times(1)).mark(3L)
- }
-
- "Meter" should "handle sampleRate > 1" in {
-
- val mockBaseMeter = mock[CodaHaleMeter]
-
- val meter = new Meter("m", mockBaseMeter, 2)
-
- meter.mark(1)
- meter.mark(3)
-
- meter.mark(5)
- meter.mark(7)
-
- verify(mockBaseMeter, times(1)).mark(4L)
- verify(mockBaseMeter, times(1)).mark(12L)
- }
-
- "Metrics" should "have a name" in {
- val metrics = new Metrics(3)
- assert(metrics.counter("counter").name == "counter")
- assert(metrics.histogram("histogram").name == "histogram")
- assert(metrics.meter("meter").name == "meter")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
deleted file mode 100644
index 9509d94..0000000
--- a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-
-class PartitionerSpec extends FlatSpec with Matchers {
- val NUM = 10
-
- "HashPartitioner" should "hash same key to same slots" in {
- val partitioner = new HashPartitioner
-
- val data = new Array[Byte](1000)
- (new java.util.Random()).nextBytes(data)
- val msg = Message(data)
-
- val partition = partitioner.getPartition(msg, NUM)
- assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
- assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" +
- "consistent result")
- }
-
- "ShufflePartitioner" should "hash same key randomly" in {
- val partitioner = new ShufflePartitioner
-
- val data = new Array[Byte](1000)
- (new java.util.Random()).nextBytes(data)
- val msg = Message(data)
-
- val partition = partitioner.getPartition(msg, NUM)
- assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
- assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" +
- "consistent result")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
deleted file mode 100644
index c38a555..0000000
--- a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-
-class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers {
- it should "authenticate correctly" in {
- val config = TestUtil.UI_CONFIG
- implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config)
- implicit val ec = system.dispatcher
- val timeout = 30.seconds
-
- val authenticator = new ConfigFileBasedAuthenticator(config)
- val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout)
- val admin = Await.result(authenticator.authenticate("admin", "admin", ec), timeout)
-
- val nonexist = Await.result(authenticator.authenticate("nonexist", "nonexist", ec), timeout)
-
- val failedGuest = Await.result(authenticator.authenticate("guest", "wrong", ec), timeout)
- val failedAdmin = Await.result(authenticator.authenticate("admin", "wrong", ec), timeout)
-
- assert(guest == Authenticator.Guest)
- assert(admin == Authenticator.Admin)
- assert(nonexist == Authenticator.UnAuthenticated)
- assert(failedGuest == Authenticator.UnAuthenticated)
- assert(failedAdmin == Authenticator.UnAuthenticated)
-
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
deleted file mode 100644
index 4a3963b..0000000
--- a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import org.scalatest.{FlatSpec, Matchers}
-
-class PasswordUtilSpec extends FlatSpec with Matchers {
-
- it should "verify the credential correctly" in {
- val password = "password"
-
- val digest1 = PasswordUtil.hash(password)
- val digest2 = PasswordUtil.hash(password)
-
- // Uses different salt each time, thus creating different hash.
- assert(digest1 != digest2)
-
- // Both are valid hash.
- assert(PasswordUtil.verify(password, digest1))
- assert(PasswordUtil.verify(password, digest2))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
deleted file mode 100644
index 3ed6ffa..0000000
--- a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
-import io.gearpump.serializer.SerializerSpec._
-
-class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
- val config = ConfigFactory.empty.withValue("gearpump.serializers",
- ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName,
- classOf[ClassB].getName -> classOf[ClassBSerializer].getName).asJava))
-
- "GearpumpSerialization" should "register custom serializers" in {
- val serialization = new GearpumpSerialization(config)
- val kryo = new Kryo
- serialization.customize(kryo)
-
- val forB = kryo.getRegistration(classOf[ClassB])
- assert(forB.getSerializer.isInstanceOf[ClassBSerializer])
-
- val forA = kryo.getRegistration(classOf[ClassA])
- assert(forA.getSerializer.isInstanceOf[ClassASerializer])
- }
-
- "FastKryoSerializer" should "serialize correctly" in {
- val myConfig = config.withFallback(TestUtil.DEFAULT_CONFIG.withoutPath("gearpump.serializers"))
- val system = ActorSystem("my", myConfig)
-
- val serializer = new FastKryoSerializer(system.asInstanceOf[ExtendedActorSystem])
-
- val bytes = serializer.serialize(new ClassA)
- val anotherA = serializer.deserialize(bytes)
-
- assert(anotherA.isInstanceOf[ClassA])
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
-
-object SerializerSpec {
-
- class ClassA {}
-
- class ClassASerializer extends KryoSerializer[ClassA] {
- override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {
- output.writeString(classOf[ClassA].getName.toString)
- }
-
- override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {
- val className = input.readString()
- Class.forName(className).newInstance().asInstanceOf[ClassA]
- }
- }
-
- class ClassB {}
-
- class ClassBSerializer extends KryoSerializer[ClassA] {
- override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {}
-
- override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {
- null
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
deleted file mode 100644
index 71b4218..0000000
--- a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-import java.io.{DataInput, DataOutput}
-
-import io.gearpump.transport.MockTransportSerializer.NettyMessage
-import io.gearpump.transport.netty.ITransportMessageSerializer
-
-class MockTransportSerializer extends ITransportMessageSerializer {
- override def getLength(obj: scala.Any): Int = 4
-
- override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = {
- transportMessage match {
- case msg: NettyMessage =>
- dataOutput.writeInt(msg.num)
- }
- }
-
- override def deserialize(dataInput: DataInput, length: Int): AnyRef = {
- NettyMessage(dataInput.readInt())
- }
-}
-
-object MockTransportSerializer {
- case class NettyMessage(num: Int)
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/transport/NettySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/NettySpec.scala b/core/src/test/scala/io/gearpump/transport/NettySpec.scala
deleted file mode 100644
index 6caf357..0000000
--- a/core/src/test/scala/io/gearpump/transport/NettySpec.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.testkit.TestProbe
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.transport.MockTransportSerializer.NettyMessage
-import io.gearpump.transport.netty.{Context, TaskMessage}
-import io.gearpump.util.Util
-
-class NettySpec extends FlatSpec with Matchers with MockitoSugar {
-
- "Netty Transport" should "send and receive message correctly " in {
- val conf = TestUtil.DEFAULT_CONFIG
- val system = ActorSystem("transport", conf)
- val context = new Context(system, conf)
- val serverActor = TestProbe()(system)
-
- val port = Util.findFreePort()
-
- import system.dispatcher
- system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) {
- context.bind("server", new ActorLookupById {
- override def lookupLocalActor(id: Long): Option[ActorRef] = Some(serverActor.ref)
- }, false, port.get)
- }
- val client = context.connect(HostPort("127.0.0.1", port.get))
-
- val data = NettyMessage(0)
- val msg = new TaskMessage(0, 1, 2, data)
- client ! msg
- serverActor.expectMsg(15.seconds, data)
-
- context.close()
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
deleted file mode 100644
index 13530ed..0000000
--- a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _}
-import io.gearpump.util.ActorSystemBooterSpec._
-
-class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
-
- "ActorSystemBooter" should "report its address back" in {
- val boot = bootSystem()
- boot.prob.expectMsgType[RegisterActorSystem]
- boot.shutdown()
- }
-
- "ActorSystemBooter" should "terminate itself when parent actor dies" in {
- val boot = bootSystem()
- boot.prob.expectMsgType[RegisterActorSystem]
-
- val dummy = boot.host.actorOf(Props(classOf[Dummy]), "dummy")
- boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
- boot.prob.reply(BindLifeCycle(dummy))
- boot.host.stop(dummy)
- val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted)
- assert(terminated)
- boot.shutdown()
- }
-
- "ActorSystemBooter" should "create new actor" in {
- val boot = bootSystem()
- boot.prob.expectMsgType[RegisterActorSystem]
- boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
- boot.prob.reply(CreateActor(Props(classOf[AcceptThreeArguments], 1, 2, 3), "three"))
- boot.prob.expectMsgType[ActorCreated]
-
- boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero"))
- boot.prob.expectMsgType[ActorCreated]
-
- boot.shutdown()
- }
-
- private def bootSystem(): Boot = {
- val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG)
-
- val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG)
-
- val receiver = TestProbe()(system)
- val address = ActorUtil.getFullPath(system, receiver.ref.path)
-
- val bootSystem = booter.boot("booter", address)
-
- Boot(system, receiver, bootSystem)
- }
-
- case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) {
- def shutdown(): Unit = {
- host.terminate()
- bootedSystem.terminate()
- Await.result(host.whenTerminated, Duration.Inf)
- Await.result(bootedSystem.whenTerminated, Duration.Inf)
- }
- }
-
- def retry(seconds: Int)(fn: => Boolean): Boolean = {
- val result = fn
- if (result) {
- result
- } else {
- Thread.sleep(1000)
- retry(seconds - 1)(fn)
- }
- }
-}
-
-object ActorSystemBooterSpec {
- class Dummy extends Actor {
- def receive: Receive = {
- case _ =>
- }
- }
-
- class AcceptZeroArguments extends Actor {
- def receive: Receive = {
- case _ =>
- }
- }
-
- class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor {
- def receive: Receive = {
- case _ =>
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
deleted file mode 100644
index 6ab5a2f..0000000
--- a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import org.scalatest.FlatSpec
-
-import io.gearpump.transport.HostPort
-
-class ActorUtilSpec extends FlatSpec {
- "masterActorPath" should "construct the ActorPath from HostPort" in {
- import io.gearpump.util.Constants.MASTER
-
- val host = "127.0.0.1"
- val port = 3000
- val master = HostPort("127.0.0.1", 3000)
- val masterPath = ActorUtil.getMasterActorPath(master)
- assert(masterPath.address.port == Some(port))
- assert(masterPath.address.system == MASTER)
- assert(masterPath.address.host == Some(host))
- assert(masterPath.address.protocol == "akka.tcp")
- assert(masterPath.toStringWithoutAddress == s"/user/$MASTER")
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
deleted file mode 100644
index 0c798f3..0000000
--- a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig}
-
-class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
- "Typesafe Cluster Configs" should "follow the override rules" in {
-
- val conf =
- """
- gearpump {
- gear = "gearpump"
- }
-
- gearpump-master {
- conf = "master"
- }
- gearpump-worker {
- conf = "worker"
- }
- conf = "base"
- """
-
- val file = File.createTempFile("test", ".conf")
- FileUtils.write(file, conf)
-
- val raw = ClusterConfig.load(ClusterConfigSource(file.toString))
-
- assert(raw.master.getString("conf") == "master", "master > base")
- assert(raw.worker.getString("conf") == "worker", "worker > base")
- assert(raw.default.getString("conf") == "base", "application > base")
-
- file.delete()
- }
-
- "ClusterConfigSource" should "return empty for non-exist files" in {
- val source = ClusterConfigSource("non-exist")
- var config = source.getConfig
- assert(config.isEmpty)
-
- val nullCheck = ClusterConfigSource(null)
- config = nullCheck.getConfig
- assert(config.isEmpty)
- }
-
- "User Config" should "work" in {
-
- implicit val system = ActorSystem("forSerialization")
-
- val map = Map[String, String]("key1" -> "1", "key2" -> "value2")
-
- val user = new UserConfig(map)
- .withLong("key3", 2L)
- .withBoolean("key4", value = true)
- .withFloat("key5", 3.14F)
- .withDouble("key6", 2.718)
-
- assert(user.getInt("key1").get == 1)
- assert(user.getString("key1").get == "1")
- assert(user.getLong("key3").get == 2L)
- assert(user.getBoolean("key4").get == true)
- assert(user.getFloat("key5").get == 3.14F)
- assert(user.getDouble("key6").get == 2.718)
-
- val data = new ConfigsSpec.Data(3)
- assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get)
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
-
-object ConfigsSpec {
- case class Data(value: Int)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
deleted file mode 100644
index 30e42c7..0000000
--- a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import java.util
-
-import org.scalatest.FlatSpec
-
-import io.gearpump.google.common.io.Files
-
-class FileUtilsSpec extends FlatSpec {
- val TXT =
- """
- |This is a multiple line
- |text
- |
- """.stripMargin
-
- it should "read/write string correctly" in {
- val file = File.createTempFile("fileutilspec", ".test")
- FileUtils.write(file, TXT)
- assert(FileUtils.read(file) == TXT)
- file.delete()
- }
-
- it should "read/write bytes array correctly" in {
- val file = File.createTempFile("fileutilspec", ".test")
- val bytes = TXT.toCharArray.map(_.toByte)
- FileUtils.writeByteArrayToFile(file, bytes)
- util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file))
- file.delete()
- }
-
- it should "create directory and all parents" in {
- val temp = Files.createTempDir()
- val parent = new File(temp, "sub1")
- val child = new File(parent, "sub2" + File.separator)
- FileUtils.forceMkdir(child)
- assert(child.exists())
- assert(child.isDirectory)
- child.delete()
- parent.delete()
- temp.delete()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/GraphSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/GraphSpec.scala b/core/src/test/scala/io/gearpump/util/GraphSpec.scala
deleted file mode 100644
index 20eab6d..0000000
--- a/core/src/test/scala/io/gearpump/util/GraphSpec.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.util.Graph.{Node, Path}
-
-class GraphSpec extends PropSpec with PropertyChecks with Matchers {
-
- case class Vertex(id: Int)
- case class Edge(from: Int, to: Int)
-
- val vertexCount = 100
-
- property("Graph with no edges should be built correctly") {
- val vertexSet = Set("A", "B", "C")
- val graph = Graph(vertexSet.toSeq.map(Node): _*)
- graph.vertices.toSet shouldBe vertexSet
- }
-
- property("Graph with vertices and edges should be built correctly") {
- val vertices: Array[Vertex] = 0.until(vertexCount).map(Vertex).toArray
- val genEdge = for {
- from <- Gen.chooseNum[Int](0, vertexCount - 1)
- to <- Gen.chooseNum[Int](0, vertexCount - 1)
- } yield Edge(from, to)
-
- var graphElements = Array.empty[Path[Vertex, _ <: Edge]]
- val outDegrees = new Array[Int](vertices.length)
- val outGoingEdges = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)])
- val edgesOf = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)])
- vertices.foreach { v =>
- graphElements :+= Node(v)
- }
-
- forAll(genEdge) {
- e: Edge =>
- val from = vertices(e.from)
- val to = vertices(e.to)
- graphElements :+= from ~ e ~> to
- outDegrees(e.from) += 1
-
- val nodeEdgeNode = (from, e, to)
- outGoingEdges(e.from) += nodeEdgeNode
-
- edgesOf(e.from) += nodeEdgeNode
- edgesOf(e.to) += nodeEdgeNode
- }
-
- val graph: Graph[Vertex, Edge] = Graph(graphElements: _*)
- graph.vertices should contain theSameElementsAs vertices
-
- 0.until(vertices.size).foreach { i =>
- val v = vertices(i)
- graph.outgoingEdgesOf(v) should contain theSameElementsAs outGoingEdges(i)
- graph.edgesOf(v).sortBy(_._1.id)
- graph.edgesOf(v) should contain theSameElementsAs edgesOf(i)
- }
- }
-
- property("Check empty graph") {
- val graph = Graph.empty[String, String]
- assert(graph.isEmpty)
- }
-
- property("check level map for a graph") {
- val graph = Graph.empty[String, String]
-
- val defaultEdge = "edge"
-
- graph.addVertex("A")
- graph.addVertex("B")
- graph.addVertex("C")
-
- graph.addEdge("A", defaultEdge, "B")
- graph.addEdge("B", defaultEdge, "C")
- graph.addEdge("A", defaultEdge, "C")
-
- graph.addVertex("D")
- graph.addVertex("E")
- graph.addVertex("F")
-
- graph.addEdge("D", defaultEdge, "E")
- graph.addEdge("E", defaultEdge, "F")
- graph.addEdge("D", defaultEdge, "F")
-
- graph.addEdge("C", defaultEdge, "E")
-
- val levelMap = graph.vertexHierarchyLevelMap()
-
- // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B)
- levelMap("A") < levelMap("B")
- levelMap("A") < levelMap("C")
- levelMap("B") < levelMap("C")
-
- levelMap("D") < levelMap("E")
- levelMap("D") < levelMap("F")
- levelMap("E") < levelMap("F")
-
- levelMap("C") < levelMap("F")
- }
-
- property("copy should return a immutalbe new Graph") {
- val graph = Graph.empty[String, String]
- val defaultEdge = "edge"
- graph.addVertex("A")
- graph.addVertex("B")
- graph.addEdge("A", defaultEdge, "B")
-
- val newGraph = graph.copy
- newGraph.addVertex("C")
-
- assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable")
- }
-
- property("subGraph should return a sub-graph for certain vertex") {
- val graph = Graph.empty[String, String]
- val defaultEdge = "edge"
- graph.addVertex("A")
- graph.addVertex("B")
- graph.addVertex("C")
- graph.addEdge("A", defaultEdge, "B")
- graph.addEdge("B", defaultEdge, "C")
- graph.addEdge("A", defaultEdge, "C")
-
- val subGraph = graph.subGraph("C")
- assert(subGraph.outDegreeOf("A") != graph.outDegreeOf("A"))
- }
-
- property("replaceVertex should hold all upstream downstream relation for a vertex") {
- val graph = Graph.empty[String, String]
- val defaultEdge = "edge"
- graph.addVertex("A")
- graph.addVertex("B")
- graph.addVertex("C")
- graph.addEdge("A", defaultEdge, "B")
- graph.addEdge("B", defaultEdge, "C")
-
- val newGraph = graph.copy.replaceVertex("B", "D")
- assert(newGraph.inDegreeOf("D") == graph.inDegreeOf("B"))
- assert(newGraph.outDegreeOf("D") == graph.outDegreeOf("B"))
- }
-
- property("Cycle detecting should work properly") {
- val graph = Graph.empty[String, String]
- val defaultEdge = "edge"
- graph.addVertex("A")
- graph.addVertex("B")
- graph.addVertex("C")
- graph.addEdge("A", defaultEdge, "B")
- graph.addEdge("B", defaultEdge, "C")
-
- assert(!graph.hasCycle())
-
- graph.addEdge("C", defaultEdge, "B")
- assert(graph.hasCycle())
-
- graph.addEdge("C", defaultEdge, "A")
- assert(graph.hasCycle())
- }
-
- property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " +
- "return equal order of graph with no circle") {
- val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5)
- val topoNoCircles = graph.topologicalOrderIterator
- val topoWithCircles = graph.topologicalOrderWithCirclesIterator
-
- assert(topoNoCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
- }
-
- property("Topological sort of graph with circles should work properly") {
- val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7,
- 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10)
- val topoWithCircles = graph.topologicalOrderWithCirclesIterator
- val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9)
-
- assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
- }
-
- property("Duplicated edges detecting should work properly") {
- val graph = Graph.empty[String, String]
- val defaultEdge = "edge"
- val anotherEdge = "edge2"
- graph.addVertex("A")
- graph.addVertex("B")
- graph.addEdge("A", defaultEdge, "B")
-
- assert(!graph.hasDuplicatedEdge())
-
- graph.addEdge("A", anotherEdge, "B")
-
- assert(graph.hasDuplicatedEdge())
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
deleted file mode 100644
index ef362ff..0000000
--- a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.TestUtil
-
-class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
- with WordSpecLike with Matchers with BeforeAndAfterAll {
-
- def this() = this(ActorSystem("WorkerSpec", TestUtil.DEFAULT_CONFIG))
- val mockActor = TestProbe()
-
- override def afterAll {
- TestKit.shutdownActorSystem(system)
- }
-
- "The TimeOutScheduler" should {
- "handle the time out event" in {
- val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref))
- val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor]
- testActor.sendMsgToIgnore()
- mockActor.expectMsg(30.seconds, MessageTimeOut)
- }
- }
-}
-
-case object Echo
-case object MessageTimeOut
-
-class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- val target = context.actorOf(Props(classOf[EchoActor]))
-
- override def receive: Receive = {
- case _ =>
- }
-
- def sendMsgToIgnore(): Unit = {
- sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut())
- }
-
- private def sendMsgTimeOut(): Unit = {
- mock ! MessageTimeOut
- }
-}
-
-class EchoActor extends Actor {
- override def receive: Receive = {
- case _ =>
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/UtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/UtilSpec.scala b/core/src/test/scala/io/gearpump/util/UtilSpec.scala
deleted file mode 100644
index b5bde04..0000000
--- a/core/src/test/scala/io/gearpump/util/UtilSpec.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Util._
-
-class UtilSpec extends FlatSpec with Matchers with MockitoSugar {
- it should "work" in {
-
- assert(findFreePort().isSuccess)
-
- assert(randInt() != randInt())
-
- val hosts = parseHostList("host1:1,host2:2")
- assert(hosts(1) == HostPort("host2", 2))
-
- assert(Util.getCurrentClassPath.length > 0)
- }
-
- it should "check application name properly" in {
- assert(Util.validApplicationName("_application_1"))
- assert(Util.validApplicationName("application_1_"))
- assert(!Util.validApplicationName("0_application_1"))
- assert(!Util.validApplicationName("_applicat&&ion_1"))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala b/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala
new file mode 100644
index 0000000..6ab7eed
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump
+
+import scala.language.implicitConversions
+
+import akka.actor.{Actor, Props, Terminated}
+import akka.testkit.TestProbe
+
+object TestProbeUtil {
+ implicit def toProps(probe: TestProbe): Props = {
+ Props(new Actor {
+ val probeRef = probe.ref
+ context.watch(probeRef)
+ def receive: Receive = {
+ case Terminated(probeRef) => context.stop(self)
+ case x => probeRef.forward(x)
+ }
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
new file mode 100644
index 0000000..f2f374e
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import java.io.File
+import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException}
+import java.util.Properties
+import java.util.concurrent.{Executors, TimeUnit}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+
+import akka.actor.{Actor, ActorSystem, Address, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
+
+import org.apache.gearpump.cluster.MasterHarness.MockMaster
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil}
+
+trait MasterHarness {
+ private val LOG = LogUtil.getLogger(getClass)
+
+ implicit val pool = MasterHarness.cachedPool
+
+ private var system: ActorSystem = null
+ private var systemAddress: Address = null
+ private var host: String = null
+ private var port: Int = 0
+ private var masterProperties = new Properties()
+ val PROCESS_BOOT_TIME = Duration(25, TimeUnit.SECONDS)
+
+ def getActorSystem: ActorSystem = system
+ def getHost: String = host
+ def getPort: Int = port
+
+ protected def config: Config
+
+ def startActorSystem(): Unit = {
+ val systemConfig = config
+ system = ActorSystem(MASTER, systemConfig)
+ systemAddress = ActorUtil.getSystemAddress(system)
+ host = systemAddress.host.get
+ port = systemAddress.port.get
+
+ masterProperties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$getHost:$getPort")
+ masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost")
+
+ LOG.info(s"Actor system is started, $host, $port")
+ }
+
+ def shutdownActorSystem(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ LOG.info(s"Actor system is stopped, $host, $port")
+ }
+
+ def convertTestConf(host: String, port: Int): File = {
+ val test = ConfigFactory.parseResourcesAnySyntax("test.conf",
+ ConfigParseOptions.defaults.setAllowMissing(true))
+
+ val newConf = test.withValue(GEARPUMP_CLUSTER_MASTERS,
+ ConfigValueFactory.fromAnyRef(Array(s"$host:$port").toList.asJava))
+
+ val confFile = File.createTempFile("main", ".conf")
+ val serialized = newConf.root().render()
+ FileUtils.write(confFile, serialized)
+ confFile
+ }
+
+ def createMockMaster(): TestProbe = {
+ val masterReceiver = TestProbe()(system)
+ val master = system.actorOf(Props(classOf[MockMaster], masterReceiver), MASTER)
+ masterReceiver
+ }
+
+ def isPortUsed(host: String, port: Int): Boolean = {
+
+ var isPortUsed = true
+ val socket = new Socket()
+ try {
+ socket.setReuseAddress(true)
+ socket.connect(new InetSocketAddress(host, port), 1000)
+ socket.isConnected
+ } catch {
+ case ex: SocketTimeoutException =>
+ isPortUsed = false
+ case ex: UnknownHostException =>
+ isPortUsed = false
+ case ex: Throwable =>
+ // For other case, we think the port has been occupied.
+ isPortUsed = true
+ } finally {
+ socket.close()
+ }
+ isPortUsed
+ }
+
+ def getContextClassPath: Array[String] = {
+ val contextLoader = Thread.currentThread().getContextClassLoader()
+
+ val urlLoader = if (!contextLoader.isInstanceOf[URLClassLoader]) {
+ contextLoader.getParent.asInstanceOf[URLClassLoader]
+ } else {
+ contextLoader.asInstanceOf[URLClassLoader]
+ }
+
+ val urls = urlLoader.getURLs()
+ val classPath = urls.map { url =>
+ new File(url.getPath()).toString
+ }
+ classPath
+ }
+
+ /**
+ * Remove trailing $
+ */
+ def getMainClassName(mainObj: Any): String = {
+ mainObj.getClass.getName.dropRight(1)
+ }
+
+ def getMasterListOption(): Array[String] = {
+ masterProperties.asScala.toList.map { kv =>
+ s"-D${kv._1}=${kv._2}"
+ }.toArray
+ }
+
+ def masterConfig: Config = {
+ ConfigFactory.parseProperties(masterProperties).withFallback(system.settings.config)
+ }
+}
+
+object MasterHarness {
+
+ val cachedPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+
+ class MockMaster(receiver: TestProbe) extends Actor {
+ def receive: Receive = {
+ case msg => {
+ receiver.ref forward msg
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala b/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala
new file mode 100644
index 0000000..5d8727e
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import akka.actor._
+
+object TestUtil {
+ val DEFAULT_CONFIG = ClusterConfig.default("test.conf")
+ val MASTER_CONFIG = ClusterConfig.master("test.conf")
+ val UI_CONFIG = ClusterConfig.ui("test.conf")
+
+ class DummyAppMaster(context: AppMasterContext, app: AppDescription) extends ApplicationMaster {
+ context.masterProxy !(context, app)
+
+ def receive: Receive = null
+ }
+
+ val dummyApp: AppDescription =
+ AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty)
+}
\ No newline at end of file