You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:14 UTC
[04/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
new file mode 100644
index 0000000..616894a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.{Message, TimeStamp}
+
+import scala.util.Try
+
+/**
+ * Filters offsets and store the mapping from timestamp to offset
+ */
+trait MessageFilter {
+ def filter(messageAndOffset: (Message, Long)): Option[Message]
+}
+
+/**
+ * Resolves timestamp to offset by look up the underlying storage
+ */
+trait OffsetTimeStampResolver {
+ def resolveOffset(time: TimeStamp): Try[Long]
+}
+
+/**
+ * Manages message's offset on TimeReplayableSource and timestamp
+ */
+trait OffsetManager extends MessageFilter with OffsetTimeStampResolver {
+ def close(): Unit
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
new file mode 100644
index 0000000..40fc088
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.TimeStamp
+
+import scala.util.Try
+
+object OffsetStorage {
+
+ /**
+ * StorageEmpty means no data has been stored
+ */
+ case object StorageEmpty extends Throwable
+
+ /**
+ * Overflow means the looked up time is
+ * larger than the maximum stored TimeStamp
+ */
+ case class Overflow(maxTimestamp: Array[Byte]) extends Throwable
+
+ /**
+ * Underflow means the looked up time is
+ * smaller than the minimum stored TimeStamp
+ */
+ case class Underflow(minTimestamp: Array[Byte]) extends Throwable
+}
+
+/**
+ * OffsetStorage stores the mapping from TimeStamp to Offset
+ */
+trait OffsetStorage {
+ /**
+ * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is
+ * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow,
+ * Underflow)
+ *
+ * @param time the time to look for
+ * @return the corresponding offset if the time is in the range, otherwise failure
+ */
+ def lookUp(time: TimeStamp): Try[Array[Byte]]
+
+ def append(time: TimeStamp, offset: Array[Byte]): Unit
+
+ def close(): Unit
+}
+
+trait OffsetStorageFactory extends java.io.Serializable {
+ def getOffsetStorage(dir: String): OffsetStorage
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
new file mode 100644
index 0000000..16f98d5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.streaming.source.DataSource
+
+/**
+ * AT-LEAST-ONCE API. Represents a data source which allow replaying.
+ *
+ * Subclass should be able to replay messages on recovery from the time
+ * when an application crashed.
+ */
+trait TimeReplayableSource extends DataSource
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
new file mode 100644
index 0000000..2ddca3a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.{Message, TimeStamp}
+
+/**
+ * TimeStampFilter filters out messages that are obsolete.
+ */
+trait TimeStampFilter extends java.io.Serializable {
+ def filter(msg: Message, predicate: TimeStamp): Option[Message]
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala
new file mode 100644
index 0000000..97c9385
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.util
+
+import akka.actor.{ActorPath, ActorRef}
+
+import org.apache.gearpump.streaming.task.TaskId
+
+object ActorPathUtil {
+
+ def executorActorName(executorId: Int): String = executorId.toString
+
+ def taskActorName(taskId: TaskId): String = {
+ s"processor_${taskId.processorId}_task_${taskId.index}"
+ }
+
+ def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = {
+ val executorManager = appMaster.path.child(executorManagerActorName)
+ val executor = executorManager.child(executorActorName(executorId))
+ val task = executor.child(taskActorName(taskId))
+ task
+ }
+
+ def executorManagerActorName: String = "executors"
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
deleted file mode 100644
index 13b8e34..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph.Node
-
-class DAGSpec extends PropSpec with PropertyChecks with Matchers {
-
- val parallelismGen = Gen.chooseNum[Int](1, 100)
-
- property("DAG should be built correctly for a single task") {
- forAll(parallelismGen) { (parallelism: Int) =>
- val task = ProcessorDescription(id = 0, taskClass = "task", parallelism = parallelism)
- val graph = Graph[ProcessorDescription, PartitionerDescription](task)
- val dag = DAG(graph)
- dag.processors.size shouldBe 1
- assert(dag.taskCount == parallelism)
- dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => TaskId(0, index))
- dag.graph.edges shouldBe empty
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
deleted file mode 100644
index 7938415..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming
-
-import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers}
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.streaming.task._
-import io.gearpump.transport.netty.WrappedChannelBuffer
-
-class MessageSerializerSpec extends WordSpec with Matchers {
-
- def testSerializer[T](obj: T, taskMessageSerializer: TaskMessageSerializer[T]): T = {
- val length = taskMessageSerializer.getLength(obj)
- val bout = new ChannelBufferOutputStream(ChannelBuffers.buffer(length))
- taskMessageSerializer.write(bout, obj)
- val input = new WrappedChannelBuffer(ChannelBuffers.wrappedBuffer(bout.buffer().array()))
- taskMessageSerializer.read(input)
- }
-
- "SerializedMessageSerializer" should {
- "serialize and deserialize SerializedMessage properly" in {
- val serializer = new SerializedMessageSerializer
- val data = new Array[Byte](256)
- new java.util.Random().nextBytes(data)
- val msg = SerializedMessage(1024, data)
- val result = testSerializer(msg, serializer)
- assert(result.timeStamp == msg.timeStamp && result.bytes.sameElements(msg.bytes))
- }
- }
-
- "TaskIdSerializer" should {
- "serialize and deserialize TaskId properly" in {
- val taskIdSerializer = new TaskIdSerializer
- val taskId = TaskId(1, 3)
- assert(testSerializer(taskId, taskIdSerializer).equals(taskId))
- }
- }
-
- "AckRequestSerializer" should {
- "serialize and deserialize AckRequest properly" in {
- val serializer = new AckRequestSerializer
- val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024)
- assert(testSerializer(ackRequest, serializer).equals(ackRequest))
- }
- }
-
- "InitialAckRequestSerializer" should {
- "serialize and deserialize AckRequest properly" in {
- val serializer = new InitialAckRequestSerializer
- val ackRequest = InitialAckRequest(TaskId(1, 2), 1024)
- assert(testSerializer(ackRequest, serializer).equals(ackRequest))
- }
- }
-
- "AckSerializer" should {
- "serialize and deserialize Ack properly" in {
- val serializer = new AckSerializer
- val ack = Ack(TaskId(1, 2), 1024, 1023, 1799)
- assert(testSerializer(ack, serializer).equals(ack))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
deleted file mode 100644
index 40310f7..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming
-
-import akka.actor.{Actor, ActorSystem}
-import akka.testkit.TestActorRef
-import org.mockito.{ArgumentMatcher, Matchers, Mockito}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.streaming.task.{TaskContext, TaskId}
-
-object MockUtil {
-
- lazy val system: ActorSystem = ActorSystem("mockUtil", TestUtil.DEFAULT_CONFIG)
-
- def mockTaskContext: TaskContext = {
- val context = Mockito.mock(classOf[TaskContext])
- Mockito.when(context.self).thenReturn(Mockito.mock(classOf[TestActorRef[Actor]]))
- Mockito.when(context.system).thenReturn(system)
- Mockito.when(context.parallelism).thenReturn(1)
- Mockito.when(context.taskId).thenReturn(TaskId(0, 0))
- context
- }
-
- def argMatch[T](func: T => Boolean): T = {
- Matchers.argThat(new ArgumentMatcher[T] {
- override def matches(param: Any): Boolean = {
- val mesage = param.asInstanceOf[T]
- func(mesage)
- }
- })
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
deleted file mode 100644
index 2ea8b84..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming
-
-import akka.actor._
-import akka.testkit.TestActorRef
-
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.appmaster.AppMasterRuntimeInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig}
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.util.Graph
-
-object StreamingTestUtil {
- private var executorId = 0
- val testUserName = "testuser"
-
- def startAppMaster(miniCluster: MiniCluster, appId: Int): TestActorRef[AppMaster] = {
-
- implicit val actorSystem = miniCluster.system
- val masterConf = AppMasterContext(appId, testUserName, Resource(1), null,
- None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString))
-
- val app = StreamApplication("test", Graph.empty, UserConfig.empty)
- val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig)
- val props = Props(new AppMaster(masterConf, appDescription))
- val appMaster = miniCluster.launchActor(props).asInstanceOf[TestActorRef[AppMaster]]
- val registerAppMaster = RegisterAppMaster(appMaster, masterConf.registerData)
- miniCluster.mockMaster.tell(registerAppMaster, appMaster)
-
- appMaster
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
deleted file mode 100644
index 90fb8ab..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, Props}
-import akka.testkit.{TestActorRef, TestProbe}
-import org.scalatest._
-
-import io.gearpump.Message
-import io.gearpump.cluster.AppMasterToMaster._
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.ClientToMaster.ShutdownApplication
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated}
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.jarstore.FilePath
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.task.{StartTime, TaskContext, _}
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- var appMaster: ActorRef = null
-
- val appId = 0
- val workerId = WorkerId(1, 0L)
- val resource = Resource(1)
- val taskDescription1 = Processor[TaskA](2)
- val taskDescription2 = Processor[TaskB](2)
- val partitioner = new HashPartitioner
- var conf: UserConfig = null
-
- var mockTask: TestProbe = null
-
- var mockMaster: TestProbe = null
- var mockMasterProxy: ActorRef = null
-
- var mockWorker: TestProbe = null
- var appDescription: AppDescription = null
- var appMasterContext: AppMasterContext = null
- var appMasterRuntimeInfo: AppMasterRuntimeInfo = null
-
- override def beforeEach(): Unit = {
- startActorSystem()
-
- mockTask = TestProbe()(getActorSystem)
-
- mockMaster = TestProbe()(getActorSystem)
- mockWorker = TestProbe()(getActorSystem)
- mockMaster.ignoreMsg(ignoreSaveAppData)
- appMasterRuntimeInfo = AppMasterRuntimeInfo(appId, appName = appId.toString)
-
- implicit val system = getActorSystem
- conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref)
- val mockJar = AppJar("for_test", FilePath("path"))
- appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar),
- mockMaster.ref, appMasterRuntimeInfo)
- val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2)
- val streamApp = StreamApplication("test", graph, conf)
- appDescription = Application.ApplicationToAppDescription(streamApp)
- import scala.concurrent.duration._
- mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
- 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
- TestActorRef[AppMaster](
- AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription,
- appMasterContext))(getActorSystem)
-
- val registerAppMaster = mockMaster.receiveOne(15.seconds)
- assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
- appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
-
- mockMaster.reply(AppMasterRegistered(appId))
- mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG"))
- mockMaster.reply(GetAppDataResult("DAG", null))
- mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock"))
-
- mockMaster.reply(GetAppDataResult("startClock", 0L))
-
- mockMaster.expectMsg(15.seconds, RequestResource(appId, ResourceRequest(Resource(4),
- workerId = WorkerId.unspecified)))
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- "AppMaster" should {
- "kill it self when allocate resource time out" in {
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2),
- mockWorker.ref, workerId))))
- mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
- }
-
- "reschedule the resource when the worker reject to start executor" in {
- val resource = Resource(4)
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource,
- mockWorker.ref, workerId))))
- mockWorker.expectMsgClass(classOf[LaunchExecutor])
- mockWorker.reply(ExecutorLaunchRejected(""))
- mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
- }
-
- "find a new master when lost connection with master" in {
-
- val watcher = TestProbe()(getActorSystem)
- watcher.watch(mockMasterProxy)
- getActorSystem.stop(mockMasterProxy)
- watcher.expectTerminated(mockMasterProxy)
- // Make sure the parent of mockMasterProxy has received the Terminated message.
- // Issus address: https://github.com/gearpump/gearpump/issues/1919
- Thread.sleep(2000)
-
- import scala.concurrent.duration._
- mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
- 30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
- mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster])
- }
-
- // // TODO: This test is failing on Travis randomly
- // // We have not identifed the root cause.
- // // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
- // // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
- //
- // "launch executor and task properly" in {
- // mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref,
- // workerId))))
- // mockWorker.expectMsgClass(classOf[LaunchExecutor])
- //
- // val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
- // mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
- // for (i <- 1 to 4) {
- // mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
- // }
- //
- // // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0
- // appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
- //
- // // there is no further upstream, so the upstreamMinClock is Long.MaxValue
- // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
- //
- // // check min clock
- // appMaster.tell(GetLatestMinClock, mockTask.ref)
- // mockTask.expectMsg(LatestMinClock(0))
- //
- //
- // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
- // appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
- //
- // // there is no further upstream, so the upstreamMinClock is Long.MaxValue
- // mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
- //
- // // check min clock
- // appMaster.tell(GetLatestMinClock, mockTask.ref)
- // mockTask.expectMsg(LatestMinClock(0))
- //
- // // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0
- // appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
- //
- // // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
- // mockTask.expectMsg(UpstreamMinClock(1))
- //
- // // check min clock
- // appMaster.tell(GetLatestMinClock, mockTask.ref)
- // mockTask.expectMsg(LatestMinClock(0))
- //
- // // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1
- // appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
- //
- // // min clock of processor 0 (Task(0, 0) and Task(0, 1))
- // mockTask.expectMsg(UpstreamMinClock(1))
- //
- // // check min clock
- // appMaster.tell(GetLatestMinClock, mockTask.ref)
- // mockTask.expectMsg(LatestMinClock(1))
- //
- // // shutdown worker and all executor on this work, expect appmaster to ask
- // // for new resources
- // workerSystem.shutdown()
- // mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation =
- // Relaxation.ONEWORKER)))
- // }
- }
-
- def ignoreSaveAppData: PartialFunction[Any, Boolean] = {
- case msg: SaveAppData => true
- }
-}
-
-object AppMasterSpec {
- val MASTER = "master"
- case object TaskStarted
-
- val MOCK_MASTER_PROXY = "mockMasterProxy"
-}
-
-class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
- val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
- override def onStart(startTime: StartTime): Unit = {
- master ! AppMasterSpec.TaskStarted
- }
-
- override def onNext(msg: Message): Unit = {}
-}
-
-class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
- val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
- override def onStart(startTime: StartTime): Unit = {
- master ! AppMasterSpec.TaskStarted
- }
-
- override def onNext(msg: Message): Unit = {}
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
deleted file mode 100644
index 01744a3..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.{Future, Promise}
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
-import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
-import io.gearpump.streaming.appmaster.ClockServiceSpec.Store
-import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _}
-import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
- with WordSpecLike with Matchers with BeforeAndAfterAll {
-
- def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG))
-
- val hash = Partitioner[HashPartitioner]
- val task1 = ProcessorDescription(id = 0, taskClass = classOf[TaskActor].getName, parallelism = 1)
- val task2 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1)
- val dag = DAG(Graph(task1 ~ hash ~> task2))
-
- override def afterAll {
- TestKit.shutdownActorSystem(system)
- }
-
- "The ClockService" should {
- "maintain a global view of message timestamp in the application" in {
- val store = new Store()
- val startClock = 100L
- store.put(ClockService.START_CLOCK, startClock)
- val clockService = system.actorOf(Props(new ClockService(dag, store)))
- clockService ! GetLatestMinClock
- expectMsg(LatestMinClock(startClock))
-
- // task(0,0): clock(101); task(1,0): clock(100)
- clockService ! UpdateClock(TaskId(0, 0), 101)
-
- // There is no upstream, so pick Long.MaxValue
- expectMsg(UpstreamMinClock(Long.MaxValue))
-
- // Min clock is updated
- clockService ! GetLatestMinClock
- expectMsg(LatestMinClock(100))
-
- // task(0,0): clock(101); task(1,0): clock(101)
- clockService ! UpdateClock(TaskId(1, 0), 101)
-
- // Upstream is Task(0, 0), 101
- expectMsg(UpstreamMinClock(101))
-
- // Min clock is updated
- clockService ! GetLatestMinClock
- expectMsg(LatestMinClock(101))
- }
-
- "act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in {
- val store = new Store()
- val startClock = 100L
- store.put(ClockService.START_CLOCK, startClock)
- val clockService = system.actorOf(Props(new ClockService(dag, store)))
- val task = TestProbe()
- clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
- task.expectMsgType[UpstreamMinClock]
-
- val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
- parallelism = 1)
- val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
- parallelism = 1)
- val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName,
- parallelism = 1)
- val dagAddMiddleNode = DAG(Graph(
- task1 ~ hash ~> task2,
- task1 ~ hash ~> task3,
- task3 ~ hash ~> task2,
- task2 ~ hash ~> task4,
- task5 ~ hash ~> task1
- ))
- val user = TestProbe()
- clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
-
- val clocks = user.expectMsgPF() {
- case ChangeToNewDAGSuccess(clocks) =>
- clocks
- }
-
- // For intermediate task, pick its upstream as initial clock
- assert(clocks(task3.id) == clocks(task1.id))
-
- // For sink task, pick its upstream as initial clock
- assert(clocks(task4.id) == clocks(task2.id))
-
- // For source task, set the initial clock as startClock
- assert(clocks(task5.id) == startClock)
- }
-
- "maintain global checkpoint time" in {
- val store = new Store()
- val startClock = 100L
- store.put(ClockService.START_CLOCK, startClock)
- val clockService = system.actorOf(Props(new ClockService(dag, store)))
- clockService ! UpdateClock(TaskId(0, 0), 200L)
- expectMsgType[UpstreamMinClock]
- clockService ! UpdateClock(TaskId(1, 0), 200L)
- expectMsgType[UpstreamMinClock]
-
- clockService ! GetStartClock
- expectMsg(StartClock(200L))
-
- val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true)
- val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
- parallelism = 1, taskConf = conf)
- val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
- parallelism = 1, taskConf = conf)
- val dagWithStateTasks = DAG(Graph(
- task1 ~ hash ~> task2,
- task1 ~ hash ~> task3,
- task3 ~ hash ~> task2,
- task2 ~ hash ~> task4
- ), version = 1)
-
- val taskId3 = TaskId(3, 0)
- val taskId4 = TaskId(4, 0)
-
- clockService ! ChangeToNewDAG(dagWithStateTasks)
- expectMsgType[ChangeToNewDAGSuccess]
-
- clockService ! ReportCheckpointClock(taskId3, startClock)
- clockService ! ReportCheckpointClock(taskId4, startClock)
- clockService ! GetStartClock
- expectMsg(StartClock(startClock))
-
- clockService ! ReportCheckpointClock(taskId3, 200L)
- clockService ! ReportCheckpointClock(taskId4, 300L)
- clockService ! GetStartClock
- expectMsg(StartClock(startClock))
-
- clockService ! ReportCheckpointClock(taskId3, 300L)
- clockService ! GetStartClock
- expectMsg(StartClock(300L))
- }
- }
-
- "ProcessorClock" should {
- "maintain the min clock of current processor" in {
- val processorId = 0
- val parallism = 3
- val clock = new ProcessorClock(processorId, LifeTime.Immortal, parallism)
- clock.init(100L)
- clock.updateMinClock(0, 101)
- assert(clock.min == 100L)
-
- clock.updateMinClock(1, 102)
- assert(clock.min == 100L)
-
- clock.updateMinClock(2, 103)
- assert(clock.min == 101L)
- }
- }
-
- "HealthChecker" should {
- "report stalling if the clock is not advancing" in {
- val healthChecker = new HealthChecker(stallingThresholdSeconds = 1)
- val source = ProcessorDescription(id = 0, taskClass = null, parallelism = 1)
- val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1)
- sourceClock.init(0L)
- val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
- val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1)
- sinkClock.init(0L)
- val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
- graph.addVertex(source)
- graph.addVertex(sink)
- graph.addEdge(source, PartitionerDescription(null), sink)
- val dag = DAG(graph)
- val clocks = Map(
- 0 -> sourceClock,
- 1 -> sinkClock
- )
-
- sourceClock.updateMinClock(0, 100L)
- sinkClock.updateMinClock(0, 100L)
-
- // Clock advances from 0 to 100, there is no stalling.
- healthChecker.check(currentMinClock = 100, clocks, dag, 200)
- healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId]
-
- // Clock not advancing.
- // Pasted time exceed the stalling threshold, report stalling
- healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
-
- // The source task is stalling the clock
- healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0))
-
- // Advance the source clock
- sourceClock.updateMinClock(0, 101L)
- healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
- // The sink task is stalling the clock
- healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0))
- }
- }
-}
-
-object ClockServiceSpec {
-
- class Store extends AppDataStore {
-
- private var map = Map.empty[String, Any]
-
- def put(key: String, value: Any): Future[Any] = {
- map = map + (key -> value)
- Promise.successful(value).future
- }
-
- def get(key: String): Future[Any] = {
- Promise.successful(map.get(key).getOrElse(null)).future
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
deleted file mode 100644
index fb633f9..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
-import io.gearpump.streaming.task.{Subscriber, TaskActor}
-import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
-
- val hash = Partitioner[HashPartitioner]
- val task1 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1)
- val task2 = ProcessorDescription(id = 2, taskClass = classOf[TaskActor].getName, parallelism = 1)
- val graph = Graph(task1 ~ hash ~> task2)
- val dag = DAG(graph)
- implicit var system: ActorSystem = null
- val appId = 0
- lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph)
-
- "DagManager" should {
- import io.gearpump.streaming.appmaster.ClockServiceSpec.Store
- "maintain the dags properly" in {
- val store = new Store
-
- val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag))))
- val client = TestProbe()
- client.send(dagManager, GetLatestDAG)
- client.expectMsg(LatestDAG(dag))
-
- client.send(dagManager, GetTaskLaunchData(dag.version, task1.id, null))
- val task1LaunchData = TaskLaunchData(task1, Subscriber.of(task1.id, dag))
- client.expectMsg(task1LaunchData)
-
- val task2LaunchData = TaskLaunchData(task2, Subscriber.of(task2.id, dag))
- client.send(dagManager, GetTaskLaunchData(dag.version, task2.id, null))
- client.expectMsg(task2LaunchData)
-
- val watcher = TestProbe()
- client.send(dagManager, WatchChange(watcher.ref))
- val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue))
-
- client.send(dagManager, ReplaceProcessor(task2.id, task3))
- client.expectMsg(DAGOperationSuccess)
-
- client.send(dagManager, GetLatestDAG)
- val newDag = client.expectMsgPF() {
- case LatestDAG(dag) => dag
- }
- assert(newDag.processors.contains(task3.id))
- watcher.expectMsgType[LatestDAG]
-
- val task4 = task3.copy(id = 4)
- client.send(dagManager, ReplaceProcessor(task3.id, task4))
- client.expectMsgType[DAGOperationFailed]
-
- client.send(dagManager, NewDAGDeployed(newDag.version))
- client.send(dagManager, ReplaceProcessor(task3.id, task4))
- client.expectMsg(DAGOperationSuccess)
- }
-
- "retrieve last stored dag properly" in {
- val store = new Store
- val newGraph = Graph(task1 ~ hash ~> task2 ~> task2)
- val newDag = DAG(newGraph)
- store.put(StreamApplication.DAG, newDag)
- val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag))))
- val client = TestProbe()
- client.send(dagManager, GetLatestDAG)
- client.expectMsg(LatestDAG(newDag))
- }
- }
-
- override def afterAll {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- override def beforeAll {
- this.system = ActorSystem("DagManagerSpec", TestUtil.DEFAULT_CONFIG)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
deleted file mode 100644
index a57a1ae..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest._
-
-import io.gearpump.TestProbeUtil
-import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
-import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.jarstore.FilePath
-import io.gearpump.streaming.ExecutorId
-import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
-import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _}
-import io.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease
-import io.gearpump.util.ActorSystemBooter.BindLifeCycle
-import io.gearpump.util.LogUtil
-
-class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- implicit var system: ActorSystem = null
-
- private val LOG = LogUtil.getLogger(getClass)
- private val appId = 0
- private val resource = Resource(10)
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- private def startExecutorSystems = {
- val master = TestProbe()
- val taskManager = TestProbe()
- val executor = TestProbe()
- val userConfig = UserConfig.empty
-
- val username = "user"
- val appName = "app"
- val appJar = Some(AppJar("for_test", FilePath("path")))
-
- val appMasterContext = AppMasterContext(appId, username, null, null, appJar, master.ref, null)
-
- val executorFactory = (_: ExecutorContext, _: UserConfig, _: Address, _: ExecutorId) => {
- executor.ref ! StartExecutorActorPlease
- TestProbeUtil.toProps(executor)
- }
- val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext,
- executorFactory, ConfigFactory.empty, appName)))
-
- taskManager.send(executorManager, SetTaskManager(taskManager.ref))
- val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified))
-
- // Starts executors
- taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get))
-
- // Asks master to start executor systems
- import scala.concurrent.duration._
- val startExecutorSystem = master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems]
- assert(startExecutorSystem.resources == resourceRequest)
- import startExecutorSystem.executorSystemConfig.{classPath, executorAkkaConfig, jar, jvmArguments, username => returnedUserName}
- assert(startExecutorSystem.resources == resourceRequest)
-
- assert(classPath.length == 0)
- assert(jvmArguments.length == 0)
- assert(jar == appJar)
- assert(returnedUserName == username)
- assert(executorAkkaConfig.isEmpty)
-
- (master, executor, taskManager, executorManager)
- }
-
- it should "report timeout to taskManager" in {
- import io.gearpump.streaming.appmaster.ExecutorManager._
- val (master, executor, taskManager, executorManager) = startExecutorSystems
- master.reply(StartExecutorSystemTimeout)
- taskManager.expectMsg(StartExecutorsTimeOut)
- }
-
- it should "start executor actor correctly" in {
- val (master, executor, taskManager, executorManager) = startExecutorSystems
- val executorSystemDaemon = TestProbe()
- val worker = TestProbe()
- val workerId = WorkerId(0, 0L)
- val workerInfo = WorkerInfo(workerId, worker.ref)
- val executorSystem = ExecutorSystem(0, null, executorSystemDaemon.ref,
- resource, workerInfo)
- master.reply(ExecutorSystemStarted(executorSystem, None))
- import scala.concurrent.duration._
- val bindLifeWith = executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle]
- val proxyExecutor = bindLifeWith.actor
- executor.expectMsg(StartExecutorActorPlease)
-
- val executorId = 0
-
- // Registers executor
- executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId,
- resource, workerInfo))
- taskManager.expectMsgType[ExecutorStarted]
-
- // Broadcasts message to childs
- taskManager.send(executorManager, BroadCast("broadcast"))
- executor.expectMsg("broadcast")
-
- // Unicast
- taskManager.send(executorManager, UniCast(executorId, "unicast"))
- executor.expectMsg("unicast")
-
- // Updates executor resource status
- val usedResource = Resource(5)
- executorManager ! ExecutorResourceUsageSummary(Map(executorId -> usedResource))
- worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - usedResource))
-
- // Watches for executor termination
- system.stop(executor.ref)
- LOG.info("Shutting down executor, and wait taskManager to get notified")
- taskManager.expectMsg(ExecutorStopped(executorId))
- }
-}
-
-object ExecutorManagerSpec {
- case object StartExecutorActorPlease
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
deleted file mode 100644
index 9d4432a..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.duration._
-
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.streaming.executor.ExecutorRestartPolicy
-import io.gearpump.streaming.task.TaskId
-
-class ExecutorRestartPolicySpec extends WordSpec with Matchers {
-
- "ExecutorRestartPolicy" should {
- "decide whether to restart the executor" in {
- val executorId1 = 1
- val executorId2 = 2
- val taskId = TaskId(0, 0)
- val executorSupervisor = new ExecutorRestartPolicy(
- maxNrOfRetries = 3, withinTimeRange = 1.seconds)
- executorSupervisor.addTaskToExecutor(executorId1, taskId)
- assert(executorSupervisor.allowRestartExecutor(executorId1))
- assert(executorSupervisor.allowRestartExecutor(executorId1))
- executorSupervisor.addTaskToExecutor(executorId2, taskId)
- assert(executorSupervisor.allowRestartExecutor(executorId2))
- assert(!executorSupervisor.allowRestartExecutor(executorId2))
- Thread.sleep(1000)
- assert(executorSupervisor.allowRestartExecutor(executorId2))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
deleted file mode 100644
index 6cd70d9..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Await
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.ClientToMaster.QueryHistoryMetrics
-import io.gearpump.cluster.MasterToClient.HistoryMetrics
-import io.gearpump.cluster.TestUtil
-import io.gearpump.metrics.Metrics.{Counter, Histogram, Meter}
-import io.gearpump.util.HistoryMetricsService
-import io.gearpump.util.HistoryMetricsService._
-
-class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- val count = 2
- val intervalMs = 10
-
- val config = HistoryMetricsConfig(
- retainHistoryDataHours = 72,
- retainHistoryDataIntervalMs = 3600 * 1000,
- retainRecentDataSeconds = 300,
- retainRecentDataIntervalMs = 15 * 1000)
-
- "SingleValueMetricsStore" should "retain metrics and expire old value" in {
-
- val store = new SingleValueMetricsStore(count, intervalMs)
-
- var now = 0L
- // Only 1 data point will be kept in @intervalMs
- store.add(Counter("count", 1), now)
- store.add(Counter("count", 2), now)
-
- now = now + intervalMs + 1
-
- // Only 1 data point will be kept in @intervalMs
- store.add(Counter("count", 3), now)
- store.add(Counter("count", 4), now)
-
- now = now + intervalMs + 1
-
- // Only 1 data point will be kept in @intervalMs
- // expire oldest data point, because we only keep @count records
- store.add(Counter("count", 5), now)
- store.add(Counter("count", 6), now)
-
- val result = store.read
- assert(result.size == count)
-
- // The oldest value is expired
- assert(result.head.value.asInstanceOf[Counter].value == 3L)
-
- // The newest value is inserted
- assert(result.last.value.asInstanceOf[Counter].value == 5L)
- }
-
- val meterTemplate = Meter("meter", 0, 0, 0, "s")
-
- "HistogramMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
- val store = new HistogramMetricsStore(config)
- val a = Histogram(null, 100, 0, 0, 0, 0, 0)
- val b = Histogram(null, 200, 0, 0, 0, 0, 0)
- val c = Histogram(null, 50, 0, 0, 0, 0, 0)
-
- store.add(a)
- store.add(b)
- store.add(c)
-
- assert(store.readLatest.map(_.value) == List(c))
- assert(store.readRecent.map(_.value) == List(a))
- assert(store.readHistory.map(_.value) == List(a))
- }
-
- "MeterMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
- val store = new MeterMetricsStore(config)
-
- val a = Meter(null, 1, 100, 0, null)
- val b = Meter(null, 1, 200, 0, null)
- val c = Meter(null, 1, 50, 0, null)
-
- store.add(a)
- store.add(b)
- store.add(c)
-
- assert(store.readLatest.map(_.value) == List(c))
- assert(store.readRecent.map(_.value) == List(a))
- assert(store.readHistory.map(_.value) == List(a))
- }
-
- "CounterMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
- val store = new CounterMetricsStore(config)
- val a = Counter(null, 50)
- val b = Counter(null, 100)
- val c = Counter(null, 150)
-
- store.add(a)
- store.add(b)
- store.add(c)
-
- assert(store.readLatest.map(_.value) == List(c))
- assert(store.readRecent.map(_.value) == List(a))
- assert(store.readHistory.map(_.value) == List(a))
- }
-
- "HistoryMetricsService" should
- "retain lastest metrics data and allow user to query metrics by path" in {
- implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- val appId = 0
- val service = system.actorOf(Props(new HistoryMetricsService("app0", config)))
- service ! Counter("metric.counter", 0)
- service ! Meter("metric.meter", 0, 0, 0, null)
- service ! Histogram("metric.histogram", 0, 0, 0, 0, 0, 0)
-
- val client = TestProbe()
-
- // Filters metrics with path "metric.counter"
- client.send(service, QueryHistoryMetrics("metric.counter"))
- import scala.concurrent.duration._
- client.expectMsgPF(3.seconds) {
- case history: HistoryMetrics =>
- assert(history.path == "metric.counter")
- val metricList = history.metrics
- metricList.foreach(metricItem =>
- assert(metricItem.value.isInstanceOf[Counter])
- )
- }
-
- // Filters metrics with path "metric.meter"
- client.send(service, QueryHistoryMetrics("metric.meter"))
- client.expectMsgPF(3.seconds) {
- case history: HistoryMetrics =>
- assert(history.path == "metric.meter")
- val metricList = history.metrics
- metricList.foreach(metricItem =>
- assert(metricItem.value.isInstanceOf[Meter])
- )
- }
-
- // Filters metrics with path "metric.histogram"
- client.send(service, QueryHistoryMetrics("metric.histogram"))
- client.expectMsgPF(3.seconds) {
- case history: HistoryMetrics =>
- assert(history.path == "metric.histogram")
- val metricList = history.metrics
- metricList.foreach(metricItem =>
- assert(metricItem.value.isInstanceOf[Histogram])
- )
- }
-
- // Filters metrics with path prefix "metric", all metrics which can
- // match the path prefix will be retained.
- client.send(service, QueryHistoryMetrics("metric"))
- client.expectMsgPF(3.seconds) {
- case history: HistoryMetrics =>
- val metricList = history.metrics
-
- var counterFound = false
- var meterFound = false
- var histogramFound = false
-
- metricList.foreach(metricItem =>
- metricItem.value match {
- case v: Counter => counterFound = true
- case v: Meter => meterFound = true
- case v: Histogram => histogramFound = true
- case _ => // Skip
- }
- )
-
- // All kinds of metric type are reserved.
- assert(counterFound && meterFound && histogramFound)
- }
-
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
deleted file mode 100644
index b391196..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorSystem
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppJar, TestUtil}
-import io.gearpump.jarstore.FilePath
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.streaming.{DAG, ProcessorDescription, _}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class JarSchedulerSpec extends WordSpec with Matchers {
- val mockJar1 = AppJar("jar1", FilePath("path"))
- val mockJar2 = AppJar("jar2", FilePath("path"))
- val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1,
- jar = mockJar1)
- val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1,
- jar = mockJar1)
- val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2,
- jar = mockJar2)
- val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
-
- import scala.concurrent.duration._
-
- "JarScheduler" should {
- "schedule tasks depends on app jar" in {
- val system = ActorSystem("JarSchedulerSpec")
- implicit val dispatcher = system.dispatcher
- val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
- manager.setDag(dag, Future {
- 0L
- })
- val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified))
- val result = Await.result(manager.getResourceRequestDetails(), 15.seconds)
- assert(result.length == 1)
- assert(result.head.jar == mockJar1)
- assert(result.head.requests.deep == requests.deep)
-
- val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0,
- Resource(2)), 15.seconds)
- assert(tasks.contains(TaskId(0, 0)))
- assert(tasks.contains(TaskId(1, 0)))
-
- val newDag = replaceDAG(dag, 1, task3, 1)
-
- manager.setDag(newDag, Future {
- 0
- })
- val requestDetails = Await.result(manager.getResourceRequestDetails().
- map(_.sortBy(_.jar.name)), 15.seconds)
- assert(requestDetails.length == 2)
- assert(requestDetails.last.jar == mockJar2)
- assert(requestDetails.last.requests.deep == requests.deep)
-
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
- }
-
- def replaceDAG(
- dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
- : DAG = {
- val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
- newProcessor.life.birth)
- val newProcessorMap = dag.processors ++
- Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
- newProcessor.id -> newProcessor)
- val newGraph = dag.graph.subGraph(oldProcessorId).
- replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
- new DAG(newVersion, newProcessorMap, newGraph)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
deleted file mode 100644
index 2e07def..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.streaming.appmaster.TaskLocator.Localities
-import io.gearpump.streaming.task.TaskId
-
-class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- it should "serialize/deserialize correctly" in {
- val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1, 2))))
- Localities.toJson(localities)
-
- localities.localities.mapValues(_.toList) shouldBe
- Localities.fromJson(Localities.toJson(localities)).localities.mapValues(_.toList)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
deleted file mode 100644
index 8153fce..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppJar, TestUtil, UserConfig}
-import io.gearpump.jarstore.FilePath
-import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
-import io.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered}
-import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask
-import io.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut
-import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess}
-import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
-import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutors, _}
-import io.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail
-import io.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2}
-import io.gearpump.streaming.executor.Executor.RestartTasks
-import io.gearpump.streaming.task.{StartTime, TaskContext, _}
-import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-import io.gearpump.{Message, TimeStamp}
-
-class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- implicit var system: ActorSystem = null
-
- val task1Class = classOf[Task1].getName
- val task2Class = classOf[Task2].getName
-
- val mockJar = AppJar("jar_for_test", FilePath("path"))
- val task1 = ProcessorDescription(id = 0, taskClass = task1Class, parallelism = 1, jar = mockJar)
- val task2 = ProcessorDescription(id = 1, taskClass = task2Class, parallelism = 1, jar = mockJar)
-
- val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
- val dagVersion = 0
-
- val task1LaunchData = TaskLaunchData(task1, Subscriber.of(processorId = 0, dag))
- val task2LaunchData = TaskLaunchData(task2, Subscriber.of(processorId = 1, dag))
-
- val appId = 0
-
- val resource = Resource(2)
- val workerId = WorkerId(0, 0L)
- val executorId = 0
-
- override def beforeEach(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterEach(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "recover by requesting new executors when executor stopped unexpectedly" in {
- val env = bootUp
- import env._
- implicit val dispatcher = system.dispatcher
-
- val resourceRequest = Array(ResourceRequest(resource, workerId))
- when(scheduler.executorFailed(executorId)).thenReturn(Future {
- Some(ResourceRequestDetail(mockJar,
- resourceRequest))
- })
-
- taskManager ! ExecutorStopped(executorId)
-
- // When one executor stop, it will also trigger the recovery by restart
- // existing executors
- executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
-
- // Asks for new executors
- val returned = executorManager.receiveN(1).head.asInstanceOf[StartExecutors]
- assert(returned.resources.deep == resourceRequest.deep)
- executorManager.reply(StartExecutorsTimeOut)
-
- // TaskManager cannot handle the TimeOut error itself, escalate to appmaster.
- appMaster.expectMsg(AllocateResourceTimeOut)
- }
-
- it should "recover by restarting existing executors when message loss happen" in {
- val env = bootUp
- import env._
-
- taskManager ! ReplayFromTimestampWindowTrailingEdge(appId)
-
- // Restart the executors so that we can replay from minClock
- executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
- }
-
- import io.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry
- "TaskChangeRegistry" should "track all modified task registration" in {
- val tasks = List(TaskId(0, 0), TaskId(0, 1))
- val registry = new TaskChangeRegistry(tasks)
- registry.taskChanged(TaskId(0, 0))
- registry.taskChanged(TaskId(0, 1))
- assert(registry.allTaskChanged)
- }
-
- "DAGDiff" should "track all the DAG migration impact" in {
- val defaultEdge = PartitionerDescription(null)
- val a = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
- val b = ProcessorDescription(id = 2, taskClass = null, parallelism = 1)
- val c = ProcessorDescription(id = 3, taskClass = null, parallelism = 1)
- val left = Graph(a ~ defaultEdge ~> b, a ~ defaultEdge ~> c)
-
- val d = ProcessorDescription(id = 4, taskClass = null, parallelism = 1)
- val right = left.copy
- right.addVertex(d)
- right.addEdge(c, defaultEdge, d)
- val e = a.copy(life = LifeTime(0, 0))
- right.replaceVertex(a, e)
-
- val diff = TaskManager.migrate(DAG(left), DAG(right, version = 1))
- diff.addedProcessors shouldBe List(d.id)
-
- diff.modifiedProcessors shouldBe List(a.id)
-
- diff.impactedUpstream shouldBe List(c.id)
- }
-
- private def bootUp: Env = {
-
- implicit val dispatcher = system.dispatcher
-
- val executorManager = TestProbe()
- val clockService = TestProbe()
- val appMaster = TestProbe()
- val executor = TestProbe()
-
- val scheduler = mock(classOf[JarScheduler])
-
- val dagManager = TestProbe()
-
- val taskManager = system.actorOf(
- Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref,
- appMaster.ref, "appName")))
-
- dagManager.expectMsgType[WatchChange]
- executorManager.expectMsgType[SetTaskManager]
-
- // Step1: first transition from Unitialized to ApplicationReady
- executorManager.expectMsgType[ExecutorResourceUsageSummary]
- dagManager.expectMsgType[NewDAGDeployed]
-
- // Step2: Get Additional Resource Request
- when(scheduler.getResourceRequestDetails())
- .thenReturn(Future {
- Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource,
- WorkerId.unspecified))))
- })
-
- // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
- dagManager.expectMsg(GetLatestDAG)
- dagManager.reply(LatestDAG(dag))
-
- // Step4: Start remote Executors.
- // received Broadcast
- executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version)))
- executorManager.expectMsgType[StartExecutors]
-
- when(scheduler.scheduleTask(mockJar, workerId, executorId, resource))
- .thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0))))
-
- // Step5: Executor is started.
- executorManager.reply(ExecutorStarted(executorId, resource, workerId, Some(mockJar)))
-
- // Step6: Prepare to start Task. First GetTaskLaunchData.
- val taskLaunchData: PartialFunction[Any, TaskLaunchData] = {
- case GetTaskLaunchData(_, 0, executorStarted) =>
- task1LaunchData.copy(context = executorStarted)
- case GetTaskLaunchData(_, 1, executorStarted) =>
- task2LaunchData.copy(context = executorStarted)
- }
-
- val launchData1 = dagManager.expectMsgPF()(taskLaunchData)
- dagManager.reply(launchData1)
-
- val launchData2 = dagManager.expectMsgPF()(taskLaunchData)
- dagManager.reply(launchData2)
-
- // Step7: Launch Task
- val launchTaskMatch: PartialFunction[Any, RegisterTask] = {
- case UniCast(executorId, launch: LaunchTasks) =>
- RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000"))
- }
-
- // Taskmanager should return the latest start clock to task(0,0)
- clockService.expectMsg(GetStartClock)
- clockService.reply(StartClock(0))
-
- // Step8: Task is started. registerTask.
- val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch)
- taskManager.tell(registerTask1, executor.ref)
- executor.expectMsgType[TaskRegistered]
-
- val registerTask2 = executorManager.expectMsgPF()(launchTaskMatch)
- taskManager.tell(registerTask2, executor.ref)
- executor.expectMsgType[TaskRegistered]
-
- // Step9: start broadcasting TaskLocations.
- import scala.concurrent.duration._
- assert(executorManager.expectMsgPF(5.seconds) {
- case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady]
- })
-
- // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId)
- taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref)
-
- // Step11: Tell ClockService to update DAG.
- clockService.expectMsgType[ChangeToNewDAG]
- clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp]))
-
- // Step12: start all tasks
- import scala.concurrent.duration._
- assert(executorManager.expectMsgPF(5.seconds) {
- case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks]
- })
-
- // Step13, Tell executor Manager the updated usage status of executors.
- executorManager.expectMsgType[ExecutorResourceUsageSummary]
-
- // Step14: transition from DynamicDAG to ApplicationReady
- Env(executorManager, clockService, appMaster, executor, taskManager, scheduler)
- }
-}
-
-object TaskManagerSpec {
- case class Env(
- executorManager: TestProbe,
- clockService: TestProbe,
- appMaster: TestProbe,
- executor: TestProbe,
- taskManager: ActorRef,
- scheduler: JarScheduler)
-
- class Task1(taskContext: TaskContext, userConf: UserConfig)
- extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = {}
- override def onNext(msg: Message): Unit = {}
- }
-
- class Task2(taskContext: TaskContext, userConf: UserConfig)
- extends Task(taskContext, userConf) {
- override def onStart(startTime: StartTime): Unit = {}
- override def onNext(msg: Message): Unit = {}
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
deleted file mode 100644
index e8417ea..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, TaskLocation, TaskLocations}
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.transport.HostPort
-class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- it should "maintain registered tasks" in {
- val task0 = TaskId(0, 0)
- val task1 = TaskId(0, 1)
- val task2 = TaskId(0, 2)
-
- val register = new TaskRegistry(expectedTasks = List(task0, task1, task2))
- val host1 = HostPort("127.0.0.1:3000")
- val host2 = HostPort("127.0.0.1:3001")
-
- val executorId = 0
- assert(Accept == register.registerTask(task0, TaskLocation(executorId, host1)))
- assert(Accept == register.registerTask(task1, TaskLocation(executorId, host1)))
- assert(Accept == register.registerTask(task2, TaskLocation(executorId, host2)))
-
- assert(Reject == register.registerTask(TaskId(100, 0), TaskLocation(executorId, host2)))
-
- assert(register.isAllTasksRegistered)
- val TaskLocations(taskLocations) = register.getTaskLocations
- val tasksOnHost1 = taskLocations.get(host1).get
- val tasksOnHost2 = taskLocations.get(host2).get
- assert(tasksOnHost1.contains(task0))
- assert(tasksOnHost1.contains(task1))
- assert(tasksOnHost2.contains(task2))
-
- assert(register.getExecutorId(task0) == Some(executorId))
- assert(register.isTaskRegisteredForExecutor(executorId))
-
- register.processorExecutors(0) shouldBe Map(
- executorId -> List(task0, task1, task2)
- )
-
- register.usedResource.resources shouldBe Map(
- executorId -> Resource(3)
- )
- }
-}