You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:33 UTC
[23/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
index 1869eab..7039b71 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/MessageDecoder.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,8 +21,7 @@ package io.gearpump.streaming.transaction.api
import io.gearpump.Message
/**
- * MessageDecoder decodes raw bytes to Message
- * It is usually written by end user and
+ * MessageDecoder decodes raw bytes to Message It is usually written by end user and
* passed into TimeReplayableSource
*/
trait MessageDecoder extends java.io.Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
index 29656dd..412ddcc 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetManager.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,27 @@
package io.gearpump.streaming.transaction.api
-import io.gearpump.{Message, TimeStamp}
-
import scala.util.Try
+import io.gearpump.{Message, TimeStamp}
+
/**
- * filter offsets and store the mapping from timestamp to offset
+ * Filters offsets and store the mapping from timestamp to offset
*/
trait MessageFilter {
def filter(messageAndOffset: (Message, Long)): Option[Message]
}
/**
- * resolve timestamp to offset by look up the underlying storage
+ * Resolves timestamp to offset by look up the underlying storage
*/
trait OffsetTimeStampResolver {
def resolveOffset(time: TimeStamp): Try[Long]
}
/**
- * manages message's offset on TimeReplayableSource and timestamp
+ * Manages message's offset on TimeReplayableSource and timestamp
*/
trait OffsetManager extends MessageFilter with OffsetTimeStampResolver {
def close(): Unit
-}
-
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
index aa9966e..fa7161c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/OffsetStorage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,10 @@
package io.gearpump.streaming.transaction.api
-import io.gearpump.TimeStamp
-
import scala.util.Try
+import io.gearpump.TimeStamp
+
object OffsetStorage {
/**
@@ -47,15 +47,17 @@ object OffsetStorage {
*/
trait OffsetStorage {
/**
- * try to look up the time in the OffsetStorage
- * return the corresponding Offset if the time is
- * in the range of stored TimeStamps or one of the
- * failure info (StorageEmpty, Overflow, Underflow)
+ * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is
+ * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow,
+ * Underflow)
+ *
* @param time the time to look for
* @return the corresponding offset if the time is in the range, otherwise failure
*/
def lookUp(time: TimeStamp): Try[Array[Byte]]
+
def append(time: TimeStamp, offset: Array[Byte]): Unit
+
def close(): Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
index b6c1e6c..50711ee 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeReplayableSource.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,11 +21,10 @@ package io.gearpump.streaming.transaction.api
import io.gearpump.streaming.source.DataSource
/**
- * AT-LEAST-ONCE API
+ * AT-LEAST-ONCE API. Represents a data source which allow replaying.
*
- * subclass should be able to replay messages on recovery from the time
- * when an application crashed
+ * Subclass should be able to replay messages on recovery from the time
+ * when an application crashed.
*/
trait TimeReplayableSource extends DataSource
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
index 7df508c..7c34e1a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,10 +21,9 @@ package io.gearpump.streaming.transaction.api
import io.gearpump.{Message, TimeStamp}
/**
- * TimeStampFilter filters message comparing its TimeStamp with the predicate.
+ * TimeStampFilter filters out messages that are obsolete.
*/
trait TimeStampFilter extends java.io.Serializable {
def filter(msg: Message, predicate: TimeStamp): Option[Message]
}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
index 5edcf38..c2ac32a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/util/ActorPathUtil.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,22 @@
package io.gearpump.streaming.util
import akka.actor.{ActorPath, ActorRef}
+
import io.gearpump.streaming.task.TaskId
object ActorPathUtil {
- def executorActorName(executorId: Int) = executorId.toString
+ def executorActorName(executorId: Int): String = executorId.toString
- def taskActorName(taskId: TaskId) = s"processor_${taskId.processorId}_task_${taskId.index}"
+ def taskActorName(taskId: TaskId): String = {
+ s"processor_${taskId.processorId}_task_${taskId.index}"
+ }
def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = {
- appMaster.path.child(executorManagerActorName).child(executorActorName(executorId)).child(taskActorName(taskId))
+ val executorManager = appMaster.path.child(executorManagerActorName)
+ val executor = executorManager.child(executorActorName(executorId))
+ val task = executor.child(taskActorName(taskId))
+ task
}
def executorManagerActorName: String = "executors"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
index 0207fb3..13b8e34 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,14 @@
package io.gearpump.streaming
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
import io.gearpump.partitioner.PartitionerDescription
import io.gearpump.streaming.task.TaskId
import io.gearpump.util.Graph
import io.gearpump.util.Graph.Node
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
class DAGSpec extends PropSpec with PropertyChecks with Matchers {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
index a5067f2..7938415 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,12 @@
*/
package io.gearpump.streaming
-import io.gearpump.streaming.task._
-import io.gearpump.transport.netty.WrappedChannelBuffer
import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers}
import org.scalatest.{Matchers, WordSpec}
+import io.gearpump.streaming.task._
+import io.gearpump.transport.netty.WrappedChannelBuffer
+
class MessageSerializerSpec extends WordSpec with Matchers {
def testSerializer[T](obj: T, taskMessageSerializer: TaskMessageSerializer[T]): T = {
@@ -32,7 +33,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
taskMessageSerializer.read(input)
}
- "SerializedMessageSerializer" should {
+ "SerializedMessageSerializer" should {
"serialize and deserialize SerializedMessage properly" in {
val serializer = new SerializedMessageSerializer
val data = new Array[Byte](256)
@@ -43,7 +44,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
}
}
- "TaskIdSerializer" should {
+ "TaskIdSerializer" should {
"serialize and deserialize TaskId properly" in {
val taskIdSerializer = new TaskIdSerializer
val taskId = TaskId(1, 3)
@@ -51,7 +52,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
}
}
- "AckRequestSerializer" should {
+ "AckRequestSerializer" should {
"serialize and deserialize AckRequest properly" in {
val serializer = new AckRequestSerializer
val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024)
@@ -59,7 +60,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
}
}
- "InitialAckRequestSerializer" should {
+ "InitialAckRequestSerializer" should {
"serialize and deserialize AckRequest properly" in {
val serializer = new InitialAckRequestSerializer
val ackRequest = InitialAckRequest(TaskId(1, 2), 1024)
@@ -67,7 +68,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
}
}
- "AckSerializer" should {
+ "AckSerializer" should {
"serialize and deserialize Ack properly" in {
val serializer = new AckSerializer
val ack = Ack(TaskId(1, 2), 1024, 1023, 1799)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
index 6577718..40310f7 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +17,12 @@
*/
package io.gearpump.streaming
-import akka.actor.{ActorSystem, Actor}
-import akka.testkit.{TestProbe, TestActorRef}
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.task.TaskId
+import akka.actor.{Actor, ActorSystem}
+import akka.testkit.TestActorRef
+import org.mockito.{ArgumentMatcher, Matchers, Mockito}
+
import io.gearpump.cluster.TestUtil
-import org.mockito.{Mockito, ArgumentMatcher}
-import org.mockito.Mockito
-import org.mockito.Matchers
+import io.gearpump.streaming.task.{TaskContext, TaskId}
object MockUtil {
@@ -39,7 +37,7 @@ object MockUtil {
context
}
- def argMatch[T](func: T => Boolean) : T = {
+ def argMatch[T](func: T => Boolean): T = {
Matchers.argThat(new ArgumentMatcher[T] {
override def matches(param: Any): Boolean = {
val mesage = param.asInstanceOf[T]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
index 6866907..2ea8b84 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,11 @@ package io.gearpump.streaming
import akka.actor._
import akka.testkit.TestActorRef
+
import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
import io.gearpump.cluster.appmaster.AppMasterRuntimeInfo
import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.{MiniCluster, AppDescription, AppMasterContext, UserConfig}
+import io.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig}
import io.gearpump.streaming.appmaster.AppMaster
import io.gearpump.util.Graph
@@ -33,7 +34,8 @@ object StreamingTestUtil {
def startAppMaster(miniCluster: MiniCluster, appId: Int): TestActorRef[AppMaster] = {
implicit val actorSystem = miniCluster.system
- val masterConf = AppMasterContext(appId, testUserName, Resource(1), null, None,miniCluster.mockMaster,AppMasterRuntimeInfo(appId, appName = appId.toString))
+ val masterConf = AppMasterContext(appId, testUserName, Resource(1), null,
+ None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString))
val app = StreamApplication("test", Graph.empty, UserConfig.empty)
val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
index d1d7006..90fb8ab 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,9 +17,13 @@
*/
package io.gearpump.streaming.appmaster
+import scala.concurrent.duration._
+
import akka.actor.{ActorRef, Props}
import akka.testkit.{TestActorRef, TestProbe}
-import io.gearpump.{WorkerId, Message}
+import org.scalatest._
+
+import io.gearpump.Message
import io.gearpump.cluster.AppMasterToMaster._
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
import io.gearpump.cluster.ClientToMaster.ShutdownApplication
@@ -29,19 +33,16 @@ import io.gearpump.cluster._
import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
import io.gearpump.cluster.master.MasterProxy
import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.jarstore.FilePath
import io.gearpump.partitioner.HashPartitioner
import io.gearpump.streaming.task.{StartTime, TaskContext, _}
import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph
import io.gearpump.util.Graph._
-import org.scalatest._
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
- override def config = TestUtil.DEFAULT_CONFIG
+ protected override def config = TestUtil.DEFAULT_CONFIG
var appMaster: ActorRef = null
@@ -63,7 +64,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
var appMasterContext: AppMasterContext = null
var appMasterRuntimeInfo: AppMasterRuntimeInfo = null
- override def beforeEach() = {
+ override def beforeEach(): Unit = {
startActorSystem()
mockTask = TestProbe()(getActorSystem)
@@ -76,50 +77,54 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
implicit val system = getActorSystem
conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref)
val mockJar = AppJar("for_test", FilePath("path"))
- appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar), mockMaster.ref, appMasterRuntimeInfo)
+ appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar),
+ mockMaster.ref, appMasterRuntimeInfo)
val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2)
val streamApp = StreamApplication("test", graph, conf)
appDescription = Application.ApplicationToAppDescription(streamApp)
import scala.concurrent.duration._
- mockMasterProxy = getActorSystem.actorOf(
- Props(new MasterProxy(List(mockMaster.ref.path), 30 seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
+ mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
+ 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
TestActorRef[AppMaster](
- AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription, appMasterContext))(getActorSystem)
+ AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription,
+ appMasterContext))(getActorSystem)
- val registerAppMaster = mockMaster.receiveOne(15 seconds)
+ val registerAppMaster = mockMaster.receiveOne(15.seconds)
assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
mockMaster.reply(AppMasterRegistered(appId))
- mockMaster.expectMsg(15 seconds, GetAppData(appId, "DAG"))
+ mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG"))
mockMaster.reply(GetAppDataResult("DAG", null))
- mockMaster.expectMsg(15 seconds, GetAppData(appId, "startClock"))
+ mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock"))
mockMaster.reply(GetAppDataResult("startClock", 0L))
- mockMaster.expectMsg(15 seconds, RequestResource(appId, ResourceRequest(Resource(4), workerId = WorkerId.unspecified)))
+ mockMaster.expectMsg(15.seconds, RequestResource(appId, ResourceRequest(Resource(4),
+ workerId = WorkerId.unspecified)))
}
- override def afterEach() = {
+ override def afterEach(): Unit = {
shutdownActorSystem()
}
"AppMaster" should {
"kill it self when allocate resource time out" in {
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), mockWorker.ref, workerId))))
- mockMaster.expectMsg(60 seconds, ShutdownApplication(appId))
+ mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2),
+ mockWorker.ref, workerId))))
+ mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
}
"reschedule the resource when the worker reject to start executor" in {
val resource = Resource(4)
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker.ref, workerId))))
+ mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource,
+ mockWorker.ref, workerId))))
mockWorker.expectMsgClass(classOf[LaunchExecutor])
mockWorker.reply(ExecutorLaunchRejected(""))
mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
}
"find a new master when lost connection with master" in {
- println(config.getList("akka.loggers"))
val watcher = TestProbe()(getActorSystem)
watcher.watch(mockMasterProxy)
@@ -130,74 +135,74 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
Thread.sleep(2000)
import scala.concurrent.duration._
- mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path), 30 seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
- mockMaster.expectMsgClass(15 seconds, classOf[RegisterAppMaster])
- }
-
- /*
-
- TODO: This test is failing on Travis randomly
- We have not identifed the root cause.
- Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
- Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
-
- "launch executor and task properly" in {
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref, workerId))))
- mockWorker.expectMsgClass(classOf[LaunchExecutor])
-
- val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
- mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
- for (i <- 1 to 4) {
- mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
- }
-
- //clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0
- appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
-
- //there is no further upstream, so the upstreamMinClock is Long.MaxValue
- mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-
- // check min clock
- appMaster.tell(GetLatestMinClock, mockTask.ref)
- mockTask.expectMsg(LatestMinClock(0))
-
-
- //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
- appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
-
- //there is no further upstream, so the upstreamMinClock is Long.MaxValue
- mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-
- // check min clock
- appMaster.tell(GetLatestMinClock, mockTask.ref)
- mockTask.expectMsg(LatestMinClock(0))
-
- //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0
- appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
-
- // min clock of processor 0 (Task(0, 0) and Task(0, 1))
- mockTask.expectMsg(UpstreamMinClock(1))
-
- // check min clock
- appMaster.tell(GetLatestMinClock, mockTask.ref)
- mockTask.expectMsg(LatestMinClock(0))
-
- //clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1
- appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
-
- // min clock of processor 0 (Task(0, 0) and Task(0, 1))
- mockTask.expectMsg(UpstreamMinClock(1))
-
- // check min clock
- appMaster.tell(GetLatestMinClock, mockTask.ref)
- mockTask.expectMsg(LatestMinClock(1))
-
- //shutdown worker and all executor on this work, expect appmaster to ask for new resources
- workerSystem.shutdown()
- mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation = Relaxation.ONEWORKER)))
+ mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
+ 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
+ mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster])
}
-**/
+ // // TODO: This test is failing on Travis randomly
+ // // We have not identifed the root cause.
+ // // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
+ // // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
+ //
+ // "launch executor and task properly" in {
+ // mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref,
+ // workerId))))
+ // mockWorker.expectMsgClass(classOf[LaunchExecutor])
+ //
+ // val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
+ // mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
+ // for (i <- 1 to 4) {
+ // mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
+ // }
+ //
+ // // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0
+ // appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
+ //
+ // // there is no further upstream, so the upstreamMinClock is Long.MaxValue
+ // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+ //
+ // // check min clock
+ // appMaster.tell(GetLatestMinClock, mockTask.ref)
+ // mockTask.expectMsg(LatestMinClock(0))
+ //
+ //
+ // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
+ // appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
+ //
+ // // there is no further upstream, so the upstreamMinClock is Long.MaxValue
+ // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+ //
+ // // check min clock
+ // appMaster.tell(GetLatestMinClock, mockTask.ref)
+ // mockTask.expectMsg(LatestMinClock(0))
+ //
+ // // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0
+ // appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
+ //
+ // // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
+ // mockTask.expectMsg(UpstreamMinClock(1))
+ //
+ // // check min clock
+ // appMaster.tell(GetLatestMinClock, mockTask.ref)
+ // mockTask.expectMsg(LatestMinClock(0))
+ //
+ // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1
+ // appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
+ //
+ // // min clock of processor 0 (Task(0, 0) and Task(0, 1))
+ // mockTask.expectMsg(UpstreamMinClock(1))
+ //
+ // // check min clock
+ // appMaster.tell(GetLatestMinClock, mockTask.ref)
+ // mockTask.expectMsg(LatestMinClock(1))
+ //
+ // // shutdown worker and all executor on this work, expect appmaster to ask
+ // // for new resources
+ // workerSystem.shutdown()
+ // mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation =
+ // Relaxation.ONEWORKER)))
+ // }
}
def ignoreSaveAppData: PartialFunction[Any, Boolean] = {
@@ -212,7 +217,7 @@ object AppMasterSpec {
val MOCK_MASTER_PROXY = "mockMasterProxy"
}
-class TaskA(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) {
+class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
override def onStart(startTime: StartTime): Unit = {
@@ -222,7 +227,7 @@ class TaskA(taskContext : TaskContext, userConf : UserConfig) extends Task(taskC
override def onNext(msg: Message): Unit = {}
}
-class TaskB(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) {
+class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
override def onStart(startTime: StartTime): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
index a451053..01744a3 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,24 +17,24 @@
*/
package io.gearpump.streaming.appmaster
+import scala.concurrent.{Future, Promise}
+
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import io.gearpump.streaming.task.{GetStartClock, UpstreamMinClock, GetLatestMinClock}
-import io.gearpump.cluster.{UserConfig, TestUtil}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import io.gearpump.cluster.{TestUtil, UserConfig}
import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
import io.gearpump.streaming.appmaster.ClockServiceSpec.Store
import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.streaming.task._
-import io.gearpump.streaming.{LifeTime, DAG, ProcessorDescription}
+import io.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _}
+import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
import io.gearpump.util.Graph
import io.gearpump.util.Graph._
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import scala.concurrent.{Future, Promise}
class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
- with WordSpecLike with Matchers with BeforeAndAfterAll{
+ with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG))
@@ -50,46 +50,48 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
"The ClockService" should {
"maintain a global view of message timestamp in the application" in {
val store = new Store()
- val startClock = 100L
+ val startClock = 100L
store.put(ClockService.START_CLOCK, startClock)
val clockService = system.actorOf(Props(new ClockService(dag, store)))
clockService ! GetLatestMinClock
expectMsg(LatestMinClock(startClock))
- //task(0,0): clock(101); task(1,0): clock(100)
+ // task(0,0): clock(101); task(1,0): clock(100)
clockService ! UpdateClock(TaskId(0, 0), 101)
- // there is no upstream, so pick Long.MaxValue
+ // There is no upstream, so pick Long.MaxValue
expectMsg(UpstreamMinClock(Long.MaxValue))
- // min clock is updated
+ // Min clock is updated
clockService ! GetLatestMinClock
expectMsg(LatestMinClock(100))
-
- //task(0,0): clock(101); task(1,0): clock(101)
+ // task(0,0): clock(101); task(1,0): clock(101)
clockService ! UpdateClock(TaskId(1, 0), 101)
- //upstream is Task(0, 0), 101
+ // Upstream is Task(0, 0), 101
expectMsg(UpstreamMinClock(101))
- // min clock is updated
+ // Min clock is updated
clockService ! GetLatestMinClock
expectMsg(LatestMinClock(101))
}
"act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in {
val store = new Store()
- val startClock = 100L
+ val startClock = 100L
store.put(ClockService.START_CLOCK, startClock)
val clockService = system.actorOf(Props(new ClockService(dag, store)))
val task = TestProbe()
clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
task.expectMsgType[UpstreamMinClock]
- val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1)
- val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, parallelism = 1)
- val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName, parallelism = 1)
+ val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
+ parallelism = 1)
+ val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
+ parallelism = 1)
+ val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName,
+ parallelism = 1)
val dagAddMiddleNode = DAG(Graph(
task1 ~ hash ~> task2,
task1 ~ hash ~> task3,
@@ -100,12 +102,12 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
val user = TestProbe()
clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
- val clocks = user.expectMsgPF(){
+ val clocks = user.expectMsgPF() {
case ChangeToNewDAGSuccess(clocks) =>
clocks
}
- // for intermediate task, pick its upstream as initial clock
+ // For intermediate task, pick its upstream as initial clock
assert(clocks(task3.id) == clocks(task1.id))
// For sink task, pick its upstream as initial clock
@@ -117,7 +119,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
"maintain global checkpoint time" in {
val store = new Store()
- val startClock = 100L
+ val startClock = 100L
store.put(ClockService.START_CLOCK, startClock)
val clockService = system.actorOf(Props(new ClockService(dag, store)))
clockService ! UpdateClock(TaskId(0, 0), 200L)
@@ -129,8 +131,10 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
expectMsg(StartClock(200L))
val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true)
- val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1, taskConf = conf)
- val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, parallelism = 1, taskConf = conf)
+ val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
+ parallelism = 1, taskConf = conf)
+ val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
+ parallelism = 1, taskConf = conf)
val dagWithStateTasks = DAG(Graph(
task1 ~ hash ~> task2,
task1 ~ hash ~> task3,
@@ -184,14 +188,14 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1)
sourceClock.init(0L)
val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
- val sinkClock = new ProcessorClock(1,LifeTime.Immortal, 1)
+ val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1)
sinkClock.init(0L)
val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
graph.addVertex(source)
graph.addVertex(sink)
graph.addEdge(source, PartitionerDescription(null), sink)
val dag = DAG(graph)
- val clocks = Map (
+ val clocks = Map(
0 -> sourceClock,
1 -> sinkClock
)
@@ -199,30 +203,29 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
sourceClock.updateMinClock(0, 100L)
sinkClock.updateMinClock(0, 100L)
- // clock advance from 0 to 100, there is no stalling.
+ // Clock advances from 0 to 100, there is no stalling.
healthChecker.check(currentMinClock = 100, clocks, dag, 200)
healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId]
- // clock not advancing.
- // pasted time exceed the stalling threshold, report stalling
+ // Clock not advancing.
+ // Pasted time exceed the stalling threshold, report stalling
healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
- // the source task is stalling the clock
+ // The source task is stalling the clock
healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0))
- // advance the source clock
+ // Advance the source clock
sourceClock.updateMinClock(0, 101L)
healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
- // the sink task is stalling the clock
+ // The sink task is stalling the clock
healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0))
}
}
-
}
object ClockServiceSpec {
- class Store extends AppDataStore{
+ class Store extends AppDataStore {
private var map = Map.empty[String, Any]
@@ -231,7 +234,7 @@ object ClockServiceSpec {
Promise.successful(value).future
}
- def get(key: String) : Future[Any] = {
+ def get(key: String): Future[Any] = {
Promise.successful(map.get(key).getOrElse(null)).future
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
index 2750cc5..fb633f9 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -18,8 +18,13 @@
package io.gearpump.streaming.appmaster
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor.{ActorSystem, Props}
import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
import io.gearpump.cluster.{TestUtil, UserConfig}
import io.gearpump.partitioner.{HashPartitioner, Partitioner}
import io.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
@@ -27,7 +32,6 @@ import io.gearpump.streaming.task.{Subscriber, TaskActor}
import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication}
import io.gearpump.util.Graph
import io.gearpump.util.Graph._
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
@@ -94,8 +98,8 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
}
override def afterAll {
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
override def beforeAll {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
index 9121a22..a57a1ae 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,39 +18,43 @@
package io.gearpump.streaming.appmaster
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor._
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
-import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorStarted
-import io.gearpump.{WorkerId, TestProbeUtil}
+import org.scalatest._
+
+import io.gearpump.TestProbeUtil
import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
+import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.jarstore.FilePath
import io.gearpump.streaming.ExecutorId
import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
-import io.gearpump.streaming.appmaster.ExecutorManager._
+import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _}
import io.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease
import io.gearpump.util.ActorSystemBooter.BindLifeCycle
import io.gearpump.util.LogUtil
-import org.scalatest._
-class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit var system: ActorSystem = null
private val LOG = LogUtil.getLogger(getClass)
private val appId = 0
private val resource = Resource(10)
- override def beforeAll = {
+ override def beforeAll(): Unit = {
system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
}
- override def afterAll = {
- system.shutdown()
- system.awaitTermination()
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
private def startExecutorSystems = {
@@ -69,17 +73,18 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll
executor.ref ! StartExecutorActorPlease
TestProbeUtil.toProps(executor)
}
- val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext, executorFactory, ConfigFactory.empty, appName)))
+ val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext,
+ executorFactory, ConfigFactory.empty, appName)))
taskManager.send(executorManager, SetTaskManager(taskManager.ref))
val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified))
- //start executors
+ // Starts executors
taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get))
- //ask master to start executor systems
+ // Asks master to start executor systems
import scala.concurrent.duration._
- val startExecutorSystem = master.receiveOne(5 seconds).asInstanceOf[StartExecutorSystems]
+ val startExecutorSystem = master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems]
assert(startExecutorSystem.resources == resourceRequest)
import startExecutorSystem.executorSystemConfig.{classPath, executorAkkaConfig, jar, jvmArguments, username => returnedUserName}
assert(startExecutorSystem.resources == resourceRequest)
@@ -94,7 +99,7 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll
}
it should "report timeout to taskManager" in {
- import ExecutorManager._
+ import io.gearpump.streaming.appmaster.ExecutorManager._
val (master, executor, taskManager, executorManager) = startExecutorSystems
master.reply(StartExecutorSystemTimeout)
taskManager.expectMsg(StartExecutorsTimeOut)
@@ -110,30 +115,31 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll
resource, workerInfo)
master.reply(ExecutorSystemStarted(executorSystem, None))
import scala.concurrent.duration._
- val bindLifeWith = executorSystemDaemon.receiveOne(3 seconds).asInstanceOf[BindLifeCycle]
+ val bindLifeWith = executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle]
val proxyExecutor = bindLifeWith.actor
executor.expectMsg(StartExecutorActorPlease)
val executorId = 0
- //register executor
- executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId, resource, workerInfo))
+ // Registers executor
+ executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId,
+ resource, workerInfo))
taskManager.expectMsgType[ExecutorStarted]
- //broad message to childs
+ // Broadcasts message to childs
taskManager.send(executorManager, BroadCast("broadcast"))
executor.expectMsg("broadcast")
- //unicast
+ // Unicast
taskManager.send(executorManager, UniCast(executorId, "unicast"))
executor.expectMsg("unicast")
- //update executor resource status
+ // Updates executor resource status
val usedResource = Resource(5)
executorManager ! ExecutorResourceUsageSummary(Map(executorId -> usedResource))
worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - usedResource))
- //watch for executor termination
+ // Watches for executor termination
system.stop(executor.ref)
LOG.info("Shutting down executor, and wait taskManager to get notified")
taskManager.expectMsg(ExecutorStopped(executorId))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
index 35659d6..9d4432a 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
@@ -15,12 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.streaming.appmaster
+import scala.concurrent.duration._
+
+import org.scalatest.{Matchers, WordSpec}
+
import io.gearpump.streaming.executor.ExecutorRestartPolicy
import io.gearpump.streaming.task.TaskId
-import org.scalatest.{Matchers, WordSpec}
-import scala.concurrent.duration._
class ExecutorRestartPolicySpec extends WordSpec with Matchers {
@@ -29,7 +32,8 @@ class ExecutorRestartPolicySpec extends WordSpec with Matchers {
val executorId1 = 1
val executorId2 = 2
val taskId = TaskId(0, 0)
- val executorSupervisor = new ExecutorRestartPolicy(maxNrOfRetries = 3, withinTimeRange = 1 seconds)
+ val executorSupervisor = new ExecutorRestartPolicy(
+ maxNrOfRetries = 3, withinTimeRange = 1.seconds)
executorSupervisor.addTaskToExecutor(executorId1, taskId)
assert(executorSupervisor.allowRestartExecutor(executorId1))
assert(executorSupervisor.allowRestartExecutor(executorId1))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
index d023be8..6cd70d9 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
package io.gearpump.streaming.appmaster
-import akka.actor.{Props, ActorSystem}
+import scala.concurrent.Await
+
+import akka.actor.{ActorSystem, Props}
import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
import io.gearpump.cluster.ClientToMaster.QueryHistoryMetrics
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem}
+import io.gearpump.cluster.MasterToClient.HistoryMetrics
import io.gearpump.cluster.TestUtil
-import io.gearpump.metrics.Metrics.{Histogram, Meter, Counter}
+import io.gearpump.metrics.Metrics.{Counter, Histogram, Meter}
import io.gearpump.util.HistoryMetricsService
-import HistoryMetricsService._
-import org.scalatest.{BeforeAndAfterEach, Matchers, FlatSpec}
+import io.gearpump.util.HistoryMetricsService._
-class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
val count = 2
val intervalMs = 10
@@ -44,30 +47,30 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
val store = new SingleValueMetricsStore(count, intervalMs)
var now = 0L
- //only 1 data point will be kept in @intervalMs
+ // Only 1 data point will be kept in @intervalMs
store.add(Counter("count", 1), now)
store.add(Counter("count", 2), now)
now = now + intervalMs + 1
- //only 1 data point will be kept in @intervalMs
+ // Only 1 data point will be kept in @intervalMs
store.add(Counter("count", 3), now)
store.add(Counter("count", 4), now)
now = now + intervalMs + 1
- //only 1 data point will be kept in @intervalMs
- //expire oldest data point, because we only keep @count records
+ // Only 1 data point will be kept in @intervalMs
+ // expire oldest data point, because we only keep @count records
store.add(Counter("count", 5), now)
store.add(Counter("count", 6), now)
val result = store.read
assert(result.size == count)
- //the oldest value is expired
+ // The oldest value is expired
assert(result.head.value.asInstanceOf[Counter].value == 3L)
- //the newest value is inserted
+ // The newest value is inserted
assert(result.last.value.asInstanceOf[Counter].value == 5L)
}
@@ -119,7 +122,8 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
assert(store.readHistory.map(_.value) == List(a))
}
- "HistoryMetricsService" should "retain lastest metrics data and allow user to query metrics by path" in {
+ "HistoryMetricsService" should
+ "retain lastest metrics data and allow user to query metrics by path" in {
implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
val appId = 0
val service = system.actorOf(Props(new HistoryMetricsService("app0", config)))
@@ -129,10 +133,10 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
val client = TestProbe()
- // filter metrics with path "metric.counter"
+ // Filters metrics with path "metric.counter"
client.send(service, QueryHistoryMetrics("metric.counter"))
import scala.concurrent.duration._
- client.expectMsgPF(3 seconds) {
+ client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
assert(history.path == "metric.counter")
val metricList = history.metrics
@@ -141,9 +145,9 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
)
}
- // filter metrics with path "metric.meter"
+ // Filters metrics with path "metric.meter"
client.send(service, QueryHistoryMetrics("metric.meter"))
- client.expectMsgPF(3 seconds) {
+ client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
assert(history.path == "metric.meter")
val metricList = history.metrics
@@ -152,9 +156,9 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
)
}
- // filter metrics with path "metric.histogram"
+ // Filters metrics with path "metric.histogram"
client.send(service, QueryHistoryMetrics("metric.histogram"))
- client.expectMsgPF(3 seconds) {
+ client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
assert(history.path == "metric.histogram")
val metricList = history.metrics
@@ -163,10 +167,10 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
)
}
- // filter metrics with path prefix "metric", all metrics which can
+ // Filters metrics with path prefix "metric", all metrics which can
// match the path prefix will be retained.
client.send(service, QueryHistoryMetrics("metric"))
- client.expectMsgPF(3 seconds) {
+ client.expectMsgPF(3.seconds) {
case history: HistoryMetrics =>
val metricList = history.metrics
@@ -179,7 +183,7 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
case v: Counter => counterFound = true
case v: Meter => meterFound = true
case v: Histogram => histogramFound = true
- case _ => //skip
+ case _ => // Skip
}
)
@@ -187,8 +191,7 @@ class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAf
assert(counterFound && meterFound && histogramFound)
}
- system.shutdown()
- system.awaitTermination()
-
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index 12128c4..b391196 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,29 +17,31 @@
*/
package io.gearpump.streaming.appmaster
+import scala.concurrent.{Await, Future}
+
import akka.actor.ActorSystem
-import com.typesafe.config.ConfigFactory
-import io.gearpump.WorkerId
-import io.gearpump.streaming.{ProcessorDescription, DAG}
-import io.gearpump.cluster.{TestUtil, AppJar}
+import org.scalatest.{Matchers, WordSpec}
+
import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppJar, TestUtil}
import io.gearpump.jarstore.FilePath
import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask2, TestTask1}
+import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
import io.gearpump.streaming.task.TaskId
-import io.gearpump.streaming._
+import io.gearpump.streaming.{DAG, ProcessorDescription, _}
import io.gearpump.util.Graph
import io.gearpump.util.Graph._
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.concurrent.{Await, Future}
class JarSchedulerSpec extends WordSpec with Matchers {
val mockJar1 = AppJar("jar1", FilePath("path"))
val mockJar2 = AppJar("jar2", FilePath("path"))
- val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1, jar = mockJar1)
- val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1, jar = mockJar1)
- val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2, jar = mockJar2)
+ val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1,
+ jar = mockJar1)
+ val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1,
+ jar = mockJar1)
+ val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2,
+ jar = mockJar2)
val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
import scala.concurrent.duration._
@@ -49,37 +51,46 @@ class JarSchedulerSpec extends WordSpec with Matchers {
val system = ActorSystem("JarSchedulerSpec")
implicit val dispatcher = system.dispatcher
val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
- manager.setDag(dag, Future{0L})
+ manager.setDag(dag, Future {
+ 0L
+ })
val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified))
- val result = Await.result(manager.getRequestDetails(), 15 seconds)
+ val result = Await.result(manager.getResourceRequestDetails(), 15.seconds)
assert(result.length == 1)
assert(result.head.jar == mockJar1)
assert(result.head.requests.deep == requests.deep)
- val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0, Resource(2)), 15 seconds)
+ val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0,
+ Resource(2)), 15.seconds)
assert(tasks.contains(TaskId(0, 0)))
assert(tasks.contains(TaskId(1, 0)))
val newDag = replaceDAG(dag, 1, task3, 1)
- manager.setDag(newDag, Future{0})
- val requestDetails = Await.result(manager.getRequestDetails().map(_.sortBy(_.jar.name)), 15 seconds)
+ manager.setDag(newDag, Future {
+ 0
+ })
+ val requestDetails = Await.result(manager.getResourceRequestDetails().
+ map(_.sortBy(_.jar.name)), 15.seconds)
assert(requestDetails.length == 2)
assert(requestDetails.last.jar == mockJar2)
assert(requestDetails.last.requests.deep == requests.deep)
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
- def replaceDAG(dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int): DAG = {
- val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, newProcessor.life.birth)
+ def replaceDAG(
+ dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
+ : DAG = {
+ val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
+ newProcessor.life.birth)
val newProcessorMap = dag.processors ++
- Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
- newProcessor.id -> newProcessor)
+ Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
+ newProcessor.id -> newProcessor)
val newGraph = dag.graph.subGraph(oldProcessorId).
- replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
+ replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
new DAG(newVersion, newProcessorMap, newGraph)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
index c55be84..2e07def 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
package io.gearpump.streaming.appmaster
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.streaming.appmaster.TaskLocator.Localities
import io.gearpump.streaming.task.TaskId
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
it should "serialize/deserialize correctly" in {
- val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1,2))))
+ val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1, 2))))
Localities.toJson(localities)
localities.localities.mapValues(_.toList) shouldBe
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 8105df3..8153fce 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,17 @@
package io.gearpump.streaming.appmaster
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.TestProbe
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.cluster.{AppJar, TestUtil, UserConfig}
import io.gearpump.jarstore.FilePath
import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
@@ -39,11 +46,7 @@ import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId}
import io.gearpump.transport.HostPort
import io.gearpump.util.Graph
import io.gearpump.util.Graph._
-import io.gearpump.{WorkerId, Message, TimeStamp}
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.concurrent.Future
+import io.gearpump.{Message, TimeStamp}
class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
@@ -73,8 +76,8 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
}
override def afterEach(): Unit = {
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
it should "recover by requesting new executors when executor stopped unexpectedly" in {
@@ -83,15 +86,18 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
implicit val dispatcher = system.dispatcher
val resourceRequest = Array(ResourceRequest(resource, workerId))
- when(scheduler.executorFailed(executorId)).thenReturn(Future{Some(ResourceRequestDetail(mockJar, resourceRequest))})
+ when(scheduler.executorFailed(executorId)).thenReturn(Future {
+ Some(ResourceRequestDetail(mockJar,
+ resourceRequest))
+ })
taskManager ! ExecutorStopped(executorId)
- // when one executor stop, it will also trigger the recovery by restart
+ // When one executor stop, it will also trigger the recovery by restart
// existing executors
executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
- // ask for new executors
+ // Asks for new executors
val returned = executorManager.receiveN(1).head.asInstanceOf[StartExecutors]
assert(returned.resources.deep == resourceRequest.deep)
executorManager.reply(StartExecutorsTimeOut)
@@ -110,7 +116,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
}
- import TaskManager.TaskChangeRegistry
+ import io.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry
"TaskChangeRegistry" should "track all modified task registration" in {
val tasks = List(TaskId(0, 0), TaskId(0, 1))
val registry = new TaskChangeRegistry(tasks)
@@ -155,24 +161,28 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
val dagManager = TestProbe()
val taskManager = system.actorOf(
- Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref, appMaster.ref, "appName")))
+ Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref,
+ appMaster.ref, "appName")))
dagManager.expectMsgType[WatchChange]
executorManager.expectMsgType[SetTaskManager]
- // step1: first transition from Unitialized to ApplicationReady
+ // Step1: first transition from Unitialized to ApplicationReady
executorManager.expectMsgType[ExecutorResourceUsageSummary]
dagManager.expectMsgType[NewDAGDeployed]
- // step2: Get Additional Resource Request
- when(scheduler.getRequestDetails())
- .thenReturn(Future{Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource, WorkerId.unspecified))))})
+ // Step2: Get Additional Resource Request
+ when(scheduler.getResourceRequestDetails())
+ .thenReturn(Future {
+ Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource,
+ WorkerId.unspecified))))
+ })
- // step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
+ // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
dagManager.expectMsg(GetLatestDAG)
dagManager.reply(LatestDAG(dag))
- // step4: Start remote Executors.
+ // Step4: Start remote Executors.
// received Broadcast
executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version)))
executorManager.expectMsgType[StartExecutors]
@@ -180,10 +190,10 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
when(scheduler.scheduleTask(mockJar, workerId, executorId, resource))
.thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0))))
- // step5: Executor is started.
+ // Step5: Executor is started.
executorManager.reply(ExecutorStarted(executorId, resource, workerId, Some(mockJar)))
- // step6: Prepare to start Task. First GetTaskLaunchData.
+ // Step6: Prepare to start Task. First GetTaskLaunchData.
val taskLaunchData: PartialFunction[Any, TaskLaunchData] = {
case GetTaskLaunchData(_, 0, executorStarted) =>
task1LaunchData.copy(context = executorStarted)
@@ -197,18 +207,17 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
val launchData2 = dagManager.expectMsgPF()(taskLaunchData)
dagManager.reply(launchData2)
- // step7: Launch Task
+ // Step7: Launch Task
val launchTaskMatch: PartialFunction[Any, RegisterTask] = {
case UniCast(executorId, launch: LaunchTasks) =>
- Console.println("Launch Task " + launch.processorDescription.id)
RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000"))
}
- // taskmanager should return the latest start clock to task(0,0)
+ // Taskmanager should return the latest start clock to task(0,0)
clockService.expectMsg(GetStartClock)
clockService.reply(StartClock(0))
- // step8: Task is started. registerTask.
+ // Step8: Task is started. registerTask.
val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch)
taskManager.tell(registerTask1, executor.ref)
executor.expectMsgType[TaskRegistered]
@@ -217,53 +226,51 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
taskManager.tell(registerTask2, executor.ref)
executor.expectMsgType[TaskRegistered]
- // step9: start broadcasting TaskLocations.
+ // Step9: start broadcasting TaskLocations.
import scala.concurrent.duration._
- assert(executorManager.expectMsgPF(5 seconds) {
+ assert(executorManager.expectMsgPF(5.seconds) {
case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady]
})
- //step10: Executor confirm it has received TaskLocationsReceived(version, executorId)
+ // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId)
taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref)
-
- // step11: Tell ClockService to update DAG.
+ // Step11: Tell ClockService to update DAG.
clockService.expectMsgType[ChangeToNewDAG]
clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp]))
-
- //step12: start all tasks
+ // Step12: start all tasks
import scala.concurrent.duration._
- assert(executorManager.expectMsgPF(5 seconds) {
+ assert(executorManager.expectMsgPF(5.seconds) {
case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks]
})
- // step13, Tell executor Manager the updated usage status of executors.
+ // Step13, Tell executor Manager the updated usage status of executors.
executorManager.expectMsgType[ExecutorResourceUsageSummary]
- // step14: transition from DynamicDAG to ApplicationReady
+ // Step14: transition from DynamicDAG to ApplicationReady
Env(executorManager, clockService, appMaster, executor, taskManager, scheduler)
}
}
object TaskManagerSpec {
case class Env(
- executorManager: TestProbe,
- clockService: TestProbe,
- appMaster: TestProbe,
- executor: TestProbe,
- taskManager: ActorRef,
- scheduler: JarScheduler)
-
- class Task1(taskContext : TaskContext, userConf : UserConfig)
+ executorManager: TestProbe,
+ clockService: TestProbe,
+ appMaster: TestProbe,
+ executor: TestProbe,
+ taskManager: ActorRef,
+ scheduler: JarScheduler)
+
+ class Task1(taskContext: TaskContext, userConf: UserConfig)
extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = ???
- override def onNext(msg: Message): Unit = ???
+ override def onStart(startTime: StartTime): Unit = {}
+ override def onNext(msg: Message): Unit = {}
}
- class Task2 (taskContext : TaskContext, userConf : UserConfig)
+ class Task2(taskContext: TaskContext, userConf: UserConfig)
extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = ???
- override def onNext(msg: Message): Unit = ???
+ override def onStart(startTime: StartTime): Unit = {}
+ override def onNext(msg: Message): Unit = {}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
index ecac824..e8417ea 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
@@ -18,12 +18,12 @@
package io.gearpump.streaming.appmaster
-import io.gearpump.streaming.appmaster.TaskRegistry.{Reject, Accept}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
import io.gearpump.cluster.scheduler.Resource
import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, TaskLocation, TaskLocations}
import io.gearpump.streaming.task.TaskId
import io.gearpump.transport.HostPort
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach {
it should "maintain registered tasks" in {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index d2373ea..2c64133 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,22 @@
*/
package io.gearpump.streaming.appmaster
+import scala.collection.mutable.ArrayBuffer
+
import com.typesafe.config.ConfigFactory
-import io.gearpump.streaming.Constants
-import io.gearpump.streaming.appmaster.TaskLocator.Localities
-import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
-import io.gearpump.{WorkerId, Message}
+import org.scalatest.{Matchers, WordSpec}
+
+import io.gearpump.Message
import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{TestUtil, UserConfig}
import io.gearpump.partitioner.{HashPartitioner, Partitioner}
import io.gearpump.streaming.appmaster.TaskLocator.Localities
import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.streaming.{DAG, ProcessorDescription}
+import io.gearpump.streaming.{Constants, DAG, ProcessorDescription}
import io.gearpump.util.Graph
import io.gearpump.util.Graph._
-import org.scalatest.{Matchers, WordSpec}
-
-import scala.collection.mutable.ArrayBuffer
class TaskSchedulerSpec extends WordSpec with Matchers {
val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 4)
@@ -47,19 +46,19 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
"schedule tasks on different workers properly according user's configuration" in {
val localities = Localities(
- Map(WorkerId(1, 0L) -> Array(TaskId(0,0), TaskId(0,1), TaskId(1,0), TaskId(1,1)),
- WorkerId(2, 0L) -> Array(TaskId(0,2), TaskId(0,3))
- ))
+ Map(WorkerId(1, 0L) -> Array(TaskId(0, 0), TaskId(0, 1), TaskId(1, 0), TaskId(1, 1)),
+ WorkerId(2, 0L) -> Array(TaskId(0, 2), TaskId(0, 3))
+ ))
val localityConfig = ConfigFactory.parseString(Localities.toJson(localities))
- import Constants.GEARPUMP_STREAMING_LOCALITIES
+ import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
val appName = "app"
val taskScheduler = new TaskSchedulerImpl(appId = 0, appName,
config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", localityConfig.root))
val expectedRequests =
- Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
+ Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER))
taskScheduler.setDAG(dag)
@@ -71,28 +70,32 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
val tasksOnWorker1 = ArrayBuffer[Int]()
val tasksOnWorker2 = ArrayBuffer[Int]()
for (i <- 0 until 4) {
- tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(1)).head.processorId)
+ tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L),
+ executorId = 0, Resource(1)).head.processorId)
}
for (i <- 0 until 2) {
- tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1, Resource(1)).head.processorId)
+ tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1,
+ Resource(1)).head.processorId)
}
- //allocate more resource, and no tasks to launch
- assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(1)) == List.empty[TaskId])
+ // Allocates more resource, and no tasks to launch
+ assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3,
+ Resource(1)) == List.empty[TaskId])
- //on worker1, executor 0
+ // On worker1, executor 0
assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1)))
- //on worker2, executor 1, Task(0, 0), Task(0, 1)
+ // On worker2, executor 1, Task(0, 0), Task(0, 1)
assert(tasksOnWorker2.sorted.sameElements(Array(0, 0)))
val rescheduledResources = taskScheduler.executorFailed(executorId = 1)
- assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), WorkerId.unspecified, relaxation = Relaxation.ONEWORKER))))
+ assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2),
+ WorkerId.unspecified, relaxation = Relaxation.ONEWORKER))))
val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(2))
- //start the failed 2 tasks Task(0, 0) and Task(0, 1)
+ // Starts the failed 2 tasks Task(0, 0) and Task(0, 1)
assert(launchedTask.length == 2)
}
@@ -101,7 +104,7 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config)
val expectedRequests =
- Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
+ Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER),
ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER))
taskScheduler.setDAG(dag)
@@ -112,16 +115,16 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
}
}
-object TaskSchedulerSpec{
- class TestTask1(taskContext : TaskContext, userConf : UserConfig)
- extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = ???
- override def onNext(msg: Message): Unit = ???
+object TaskSchedulerSpec {
+ class TestTask1(taskContext: TaskContext, userConf: UserConfig)
+ extends Task(taskContext, userConf) {
+ override def onStart(startTime: StartTime): Unit = Unit
+ override def onNext(msg: Message): Unit = Unit
}
- class TestTask2(taskContext : TaskContext, userConf : UserConfig)
- extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = ???
- override def onNext(msg: Message): Unit = ???
+ class TestTask2(taskContext: TaskContext, userConf: UserConfig)
+ extends Task(taskContext, userConf) {
+ override def onStart(startTime: StartTime): Unit = Unit
+ override def onNext(msg: Message): Unit = Unit
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
index 89d2d64..132d46c 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,30 @@
package io.gearpump.streaming.dsl
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
import akka.actor.ActorSystem
-import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.streaming.dsl.plan.OpTranslator._
import org.mockito.Mockito.when
import org.scalatest._
import org.scalatest.mock.MockitoSugar
-class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
+
+import io.gearpump.cluster.TestUtil
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
+class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
implicit var system: ActorSystem = null
- override def beforeAll: Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
}
- override def afterAll: Unit = {
- system.shutdown()
- system.awaitTermination()
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
-
it should "be able to generate multiple new streams" in {
val context: ClientContext = mock[ClientContext]
when(context.system).thenReturn(system)
@@ -57,7 +59,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with
val app = StreamApp("dsl", context)
val parallism = 3
- app.source(List("A","B","C"), parallism, "").flatMap(Array(_)).reduce(_+_)
+ app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _)
val task = app.plan.dag.vertices.iterator.next()
assert(task.taskClass == classOf[SourceTask[_, _]].getName)
assert(task.parallelism == parallism)
@@ -72,7 +74,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with
"1",
"2"
)
- val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_+_)
+ val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _)
val task = app.plan.dag.vertices.iterator.next()
/*
val task = app.plan.dag.vertices.iterator.map(desc => {