You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:14 UTC

[04/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
new file mode 100644
index 0000000..616894a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.{Message, TimeStamp}
+
+import scala.util.Try
+
+/**
+ * Filters offsets and store the mapping from timestamp to offset
+ */
+trait MessageFilter {
+  def filter(messageAndOffset: (Message, Long)): Option[Message]
+}
+
+/**
+ * Resolves timestamp to offset by look up the underlying storage
+ */
+trait OffsetTimeStampResolver {
+  def resolveOffset(time: TimeStamp): Try[Long]
+}
+
+/**
+ * Manages message's offset on TimeReplayableSource and timestamp
+ */
+trait OffsetManager extends MessageFilter with OffsetTimeStampResolver {
+  def close(): Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
new file mode 100644
index 0000000..40fc088
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.TimeStamp
+
+import scala.util.Try
+
+object OffsetStorage {
+
+  /**
+   * StorageEmpty means no data has been stored
+   */
+  case object StorageEmpty extends Throwable
+
+  /**
+   * Overflow means the looked up time is
+   * larger than the maximum stored TimeStamp
+   */
+  case class Overflow(maxTimestamp: Array[Byte]) extends Throwable
+
+  /**
+   * Underflow means the looked up time is
+   * smaller than the minimum stored TimeStamp
+   */
+  case class Underflow(minTimestamp: Array[Byte]) extends Throwable
+}
+
+/**
+ * OffsetStorage stores the mapping from TimeStamp to Offset
+ */
+trait OffsetStorage {
+  /**
+   * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is
+   * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow,
+   * Underflow)
+   *
+   * @param time the time to look for
+   * @return the corresponding offset if the time is in the range, otherwise failure
+   */
+  def lookUp(time: TimeStamp): Try[Array[Byte]]
+
+  def append(time: TimeStamp, offset: Array[Byte]): Unit
+
+  def close(): Unit
+}
+
+trait OffsetStorageFactory extends java.io.Serializable {
+  def getOffsetStorage(dir: String): OffsetStorage
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
new file mode 100644
index 0000000..16f98d5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.streaming.source.DataSource
+
+/**
+ * AT-LEAST-ONCE API. Represents a data source which allow replaying.
+ *
+ * Subclass should be able to replay messages on recovery from the time
+ * when an application crashed.
+ */
+trait TimeReplayableSource extends DataSource
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
new file mode 100644
index 0000000..2ddca3a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.{Message, TimeStamp}
+
+/**
+ * TimeStampFilter filters out messages that are obsolete.
+ */
+trait TimeStampFilter extends java.io.Serializable {
+  def filter(msg: Message, predicate: TimeStamp): Option[Message]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala
new file mode 100644
index 0000000..97c9385
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/util/ActorPathUtil.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.util
+
+import akka.actor.{ActorPath, ActorRef}
+
+import org.apache.gearpump.streaming.task.TaskId
+
+object ActorPathUtil {
+
+  def executorActorName(executorId: Int): String = executorId.toString
+
+  def taskActorName(taskId: TaskId): String = {
+    s"processor_${taskId.processorId}_task_${taskId.index}"
+  }
+
+  def taskActorPath(appMaster: ActorRef, executorId: Int, taskId: TaskId): ActorPath = {
+    val executorManager = appMaster.path.child(executorManagerActorName)
+    val executor = executorManager.child(executorActorName(executorId))
+    val task = executor.child(taskActorName(taskId))
+    task
+  }
+
+  def executorManagerActorName: String = "executors"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
deleted file mode 100644
index 13b8e34..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/DAGSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph.Node
-
-class DAGSpec extends PropSpec with PropertyChecks with Matchers {
-
-  val parallelismGen = Gen.chooseNum[Int](1, 100)
-
-  property("DAG should be built correctly for a single task") {
-    forAll(parallelismGen) { (parallelism: Int) =>
-      val task = ProcessorDescription(id = 0, taskClass = "task", parallelism = parallelism)
-      val graph = Graph[ProcessorDescription, PartitionerDescription](task)
-      val dag = DAG(graph)
-      dag.processors.size shouldBe 1
-      assert(dag.taskCount == parallelism)
-      dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => TaskId(0, index))
-      dag.graph.edges shouldBe empty
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
deleted file mode 100644
index 7938415..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/MessageSerializerSpec.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming
-
-import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers}
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.streaming.task._
-import io.gearpump.transport.netty.WrappedChannelBuffer
-
-class MessageSerializerSpec extends WordSpec with Matchers {
-
-  def testSerializer[T](obj: T, taskMessageSerializer: TaskMessageSerializer[T]): T = {
-    val length = taskMessageSerializer.getLength(obj)
-    val bout = new ChannelBufferOutputStream(ChannelBuffers.buffer(length))
-    taskMessageSerializer.write(bout, obj)
-    val input = new WrappedChannelBuffer(ChannelBuffers.wrappedBuffer(bout.buffer().array()))
-    taskMessageSerializer.read(input)
-  }
-
-  "SerializedMessageSerializer" should {
-    "serialize and deserialize SerializedMessage properly" in {
-      val serializer = new SerializedMessageSerializer
-      val data = new Array[Byte](256)
-      new java.util.Random().nextBytes(data)
-      val msg = SerializedMessage(1024, data)
-      val result = testSerializer(msg, serializer)
-      assert(result.timeStamp == msg.timeStamp && result.bytes.sameElements(msg.bytes))
-    }
-  }
-
-  "TaskIdSerializer" should {
-    "serialize and deserialize TaskId properly" in {
-      val taskIdSerializer = new TaskIdSerializer
-      val taskId = TaskId(1, 3)
-      assert(testSerializer(taskId, taskIdSerializer).equals(taskId))
-    }
-  }
-
-  "AckRequestSerializer" should {
-    "serialize and deserialize AckRequest properly" in {
-      val serializer = new AckRequestSerializer
-      val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024)
-      assert(testSerializer(ackRequest, serializer).equals(ackRequest))
-    }
-  }
-
-  "InitialAckRequestSerializer" should {
-    "serialize and deserialize AckRequest properly" in {
-      val serializer = new InitialAckRequestSerializer
-      val ackRequest = InitialAckRequest(TaskId(1, 2), 1024)
-      assert(testSerializer(ackRequest, serializer).equals(ackRequest))
-    }
-  }
-
-  "AckSerializer" should {
-    "serialize and deserialize Ack properly" in {
-      val serializer = new AckSerializer
-      val ack = Ack(TaskId(1, 2), 1024, 1023, 1799)
-      assert(testSerializer(ack, serializer).equals(ack))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
deleted file mode 100644
index 40310f7..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/MockUtil.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming
-
-import akka.actor.{Actor, ActorSystem}
-import akka.testkit.TestActorRef
-import org.mockito.{ArgumentMatcher, Matchers, Mockito}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.streaming.task.{TaskContext, TaskId}
-
-object MockUtil {
-
-  lazy val system: ActorSystem = ActorSystem("mockUtil", TestUtil.DEFAULT_CONFIG)
-
-  def mockTaskContext: TaskContext = {
-    val context = Mockito.mock(classOf[TaskContext])
-    Mockito.when(context.self).thenReturn(Mockito.mock(classOf[TestActorRef[Actor]]))
-    Mockito.when(context.system).thenReturn(system)
-    Mockito.when(context.parallelism).thenReturn(1)
-    Mockito.when(context.taskId).thenReturn(TaskId(0, 0))
-    context
-  }
-
-  def argMatch[T](func: T => Boolean): T = {
-    Matchers.argThat(new ArgumentMatcher[T] {
-      override def matches(param: Any): Boolean = {
-        val mesage = param.asInstanceOf[T]
-        func(mesage)
-      }
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
deleted file mode 100644
index 2ea8b84..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/StreamingTestUtil.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming
-
-import akka.actor._
-import akka.testkit.TestActorRef
-
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.appmaster.AppMasterRuntimeInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig}
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.util.Graph
-
-object StreamingTestUtil {
-  private var executorId = 0
-  val testUserName = "testuser"
-
-  def startAppMaster(miniCluster: MiniCluster, appId: Int): TestActorRef[AppMaster] = {
-
-    implicit val actorSystem = miniCluster.system
-    val masterConf = AppMasterContext(appId, testUserName, Resource(1), null,
-      None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString))
-
-    val app = StreamApplication("test", Graph.empty, UserConfig.empty)
-    val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig)
-    val props = Props(new AppMaster(masterConf, appDescription))
-    val appMaster = miniCluster.launchActor(props).asInstanceOf[TestActorRef[AppMaster]]
-    val registerAppMaster = RegisterAppMaster(appMaster, masterConf.registerData)
-    miniCluster.mockMaster.tell(registerAppMaster, appMaster)
-
-    appMaster
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
deleted file mode 100644
index 90fb8ab..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, Props}
-import akka.testkit.{TestActorRef, TestProbe}
-import org.scalatest._
-
-import io.gearpump.Message
-import io.gearpump.cluster.AppMasterToMaster._
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.ClientToMaster.ShutdownApplication
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated}
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.jarstore.FilePath
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.task.{StartTime, TaskContext, _}
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-  protected override def config = TestUtil.DEFAULT_CONFIG
-
-  var appMaster: ActorRef = null
-
-  val appId = 0
-  val workerId = WorkerId(1, 0L)
-  val resource = Resource(1)
-  val taskDescription1 = Processor[TaskA](2)
-  val taskDescription2 = Processor[TaskB](2)
-  val partitioner = new HashPartitioner
-  var conf: UserConfig = null
-
-  var mockTask: TestProbe = null
-
-  var mockMaster: TestProbe = null
-  var mockMasterProxy: ActorRef = null
-
-  var mockWorker: TestProbe = null
-  var appDescription: AppDescription = null
-  var appMasterContext: AppMasterContext = null
-  var appMasterRuntimeInfo: AppMasterRuntimeInfo = null
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-
-    mockTask = TestProbe()(getActorSystem)
-
-    mockMaster = TestProbe()(getActorSystem)
-    mockWorker = TestProbe()(getActorSystem)
-    mockMaster.ignoreMsg(ignoreSaveAppData)
-    appMasterRuntimeInfo = AppMasterRuntimeInfo(appId, appName = appId.toString)
-
-    implicit val system = getActorSystem
-    conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref)
-    val mockJar = AppJar("for_test", FilePath("path"))
-    appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar),
-      mockMaster.ref, appMasterRuntimeInfo)
-    val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2)
-    val streamApp = StreamApplication("test", graph, conf)
-    appDescription = Application.ApplicationToAppDescription(streamApp)
-    import scala.concurrent.duration._
-    mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
-      30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
-    TestActorRef[AppMaster](
-      AppMasterRuntimeEnvironment.props(List(mockMasterProxy.path), appDescription,
-        appMasterContext))(getActorSystem)
-
-    val registerAppMaster = mockMaster.receiveOne(15.seconds)
-    assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
-    appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
-
-    mockMaster.reply(AppMasterRegistered(appId))
-    mockMaster.expectMsg(15.seconds, GetAppData(appId, "DAG"))
-    mockMaster.reply(GetAppDataResult("DAG", null))
-    mockMaster.expectMsg(15.seconds, GetAppData(appId, "startClock"))
-
-    mockMaster.reply(GetAppDataResult("startClock", 0L))
-
-    mockMaster.expectMsg(15.seconds, RequestResource(appId, ResourceRequest(Resource(4),
-      workerId = WorkerId.unspecified)))
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "AppMaster" should {
-    "kill it self when allocate resource time out" in {
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2),
-        mockWorker.ref, workerId))))
-      mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
-    }
-
-    "reschedule the resource when the worker reject to start executor" in {
-      val resource = Resource(4)
-      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource,
-        mockWorker.ref, workerId))))
-      mockWorker.expectMsgClass(classOf[LaunchExecutor])
-      mockWorker.reply(ExecutorLaunchRejected(""))
-      mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)))
-    }
-
-    "find a new master when lost connection with master" in {
-
-      val watcher = TestProbe()(getActorSystem)
-      watcher.watch(mockMasterProxy)
-      getActorSystem.stop(mockMasterProxy)
-      watcher.expectTerminated(mockMasterProxy)
-      // Make sure the parent of mockMasterProxy has received the Terminated message.
-      // Issus address: https://github.com/gearpump/gearpump/issues/1919
-      Thread.sleep(2000)
-
-      import scala.concurrent.duration._
-      mockMasterProxy = getActorSystem.actorOf(Props(new MasterProxy(List(mockMaster.ref.path),
-        30.seconds)), AppMasterSpec.MOCK_MASTER_PROXY)
-      mockMaster.expectMsgClass(15.seconds, classOf[RegisterAppMaster])
-    }
-
-    //    // TODO: This test is failing on Travis randomly
-    //    // We have not identifed the root cause.
-    //    // Check: https://travis-ci.org/intel-hadoop/gearpump/builds/56826843
-    //    // Issue tracker: https://github.com/intel-hadoop/gearpump/issues/733
-    //
-    //    "launch executor and task properly" in {
-    //      mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(4), mockWorker.ref,
-    //       workerId))))
-    //      mockWorker.expectMsgClass(classOf[LaunchExecutor])
-    //
-    //      val workerSystem = ActorSystem("worker", TestUtil.DEFAULT_CONFIG)
-    //      mockWorker.reply(RegisterActorSystem(ActorUtil.getSystemAddress(workerSystem).toString))
-    //      for (i <- 1 to 4) {
-    //        mockMaster.expectMsg(10 seconds, AppMasterSpec.TaskStarted)
-    //      }
-    //
-    //      // clock status: task(0,0) -> 1, task(0,1)->0, task(1, 0)->0, task(1,1)->0
-    //      appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
-    //
-    //      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
-    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(0))
-    //
-    //
-    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
-    //      appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
-    //
-    //      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
-    //      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(0))
-    //
-    //      // Clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->0
-    //      appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
-    //
-    //      // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
-    //      mockTask.expectMsg(UpstreamMinClock(1))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(0))
-    //
-    //      // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 1)->0, task(1,1)->1
-    //      appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
-    //
-    //      // min clock of processor 0 (Task(0, 0) and Task(0, 1))
-    //      mockTask.expectMsg(UpstreamMinClock(1))
-    //
-    //      // check min clock
-    //      appMaster.tell(GetLatestMinClock, mockTask.ref)
-    //      mockTask.expectMsg(LatestMinClock(1))
-    //
-    //      // shutdown worker and all executor on this work, expect appmaster to ask
-    //      // for new resources
-    //      workerSystem.shutdown()
-    //      mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(4), relaxation =
-    //        Relaxation.ONEWORKER)))
-    //    }
-  }
-
-  def ignoreSaveAppData: PartialFunction[Any, Boolean] = {
-    case msg: SaveAppData => true
-  }
-}
-
-object AppMasterSpec {
-  val MASTER = "master"
-  case object TaskStarted
-
-  val MOCK_MASTER_PROXY = "mockMasterProxy"
-}
-
-class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
-  val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
-  override def onStart(startTime: StartTime): Unit = {
-    master ! AppMasterSpec.TaskStarted
-  }
-
-  override def onNext(msg: Message): Unit = {}
-}
-
-class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
-  val master = userConf.getValue[ActorRef](AppMasterSpec.MASTER).get
-  override def onStart(startTime: StartTime): Unit = {
-    master ! AppMasterSpec.TaskStarted
-  }
-
-  override def onNext(msg: Message): Unit = {}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
deleted file mode 100644
index 01744a3..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.{Future, Promise}
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
-import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
-import io.gearpump.streaming.appmaster.ClockServiceSpec.Store
-import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.streaming.task.{GetLatestMinClock, GetStartClock, UpstreamMinClock, _}
-import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
-  with WordSpecLike with Matchers with BeforeAndAfterAll {
-
-  def this() = this(ActorSystem("ClockServiceSpec", TestUtil.DEFAULT_CONFIG))
-
-  val hash = Partitioner[HashPartitioner]
-  val task1 = ProcessorDescription(id = 0, taskClass = classOf[TaskActor].getName, parallelism = 1)
-  val task2 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1)
-  val dag = DAG(Graph(task1 ~ hash ~> task2))
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  "The ClockService" should {
-    "maintain a global view of message timestamp in the application" in {
-      val store = new Store()
-      val startClock = 100L
-      store.put(ClockService.START_CLOCK, startClock)
-      val clockService = system.actorOf(Props(new ClockService(dag, store)))
-      clockService ! GetLatestMinClock
-      expectMsg(LatestMinClock(startClock))
-
-      // task(0,0): clock(101); task(1,0): clock(100)
-      clockService ! UpdateClock(TaskId(0, 0), 101)
-
-      // There is no upstream, so pick Long.MaxValue
-      expectMsg(UpstreamMinClock(Long.MaxValue))
-
-      // Min clock is updated
-      clockService ! GetLatestMinClock
-      expectMsg(LatestMinClock(100))
-
-      // task(0,0): clock(101); task(1,0): clock(101)
-      clockService ! UpdateClock(TaskId(1, 0), 101)
-
-      // Upstream is Task(0, 0), 101
-      expectMsg(UpstreamMinClock(101))
-
-      // Min clock is updated
-      clockService ! GetLatestMinClock
-      expectMsg(LatestMinClock(101))
-    }
-
-    "act on ChangeToNewDAG and make sure downstream clock smaller than upstreams" in {
-      val store = new Store()
-      val startClock = 100L
-      store.put(ClockService.START_CLOCK, startClock)
-      val clockService = system.actorOf(Props(new ClockService(dag, store)))
-      val task = TestProbe()
-      clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
-      task.expectMsgType[UpstreamMinClock]
-
-      val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
-        parallelism = 1)
-      val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
-        parallelism = 1)
-      val task5 = ProcessorDescription(id = 5, taskClass = classOf[TaskActor].getName,
-        parallelism = 1)
-      val dagAddMiddleNode = DAG(Graph(
-        task1 ~ hash ~> task2,
-        task1 ~ hash ~> task3,
-        task3 ~ hash ~> task2,
-        task2 ~ hash ~> task4,
-        task5 ~ hash ~> task1
-      ))
-      val user = TestProbe()
-      clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
-
-      val clocks = user.expectMsgPF() {
-        case ChangeToNewDAGSuccess(clocks) =>
-          clocks
-      }
-
-      // For intermediate task, pick its upstream as initial clock
-      assert(clocks(task3.id) == clocks(task1.id))
-
-      // For sink task, pick its upstream as initial clock
-      assert(clocks(task4.id) == clocks(task2.id))
-
-      // For source task, set the initial clock as startClock
-      assert(clocks(task5.id) == startClock)
-    }
-
-    "maintain global checkpoint time" in {
-      val store = new Store()
-      val startClock = 100L
-      store.put(ClockService.START_CLOCK, startClock)
-      val clockService = system.actorOf(Props(new ClockService(dag, store)))
-      clockService ! UpdateClock(TaskId(0, 0), 200L)
-      expectMsgType[UpstreamMinClock]
-      clockService ! UpdateClock(TaskId(1, 0), 200L)
-      expectMsgType[UpstreamMinClock]
-
-      clockService ! GetStartClock
-      expectMsg(StartClock(200L))
-
-      val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true)
-      val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName,
-        parallelism = 1, taskConf = conf)
-      val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName,
-        parallelism = 1, taskConf = conf)
-      val dagWithStateTasks = DAG(Graph(
-        task1 ~ hash ~> task2,
-        task1 ~ hash ~> task3,
-        task3 ~ hash ~> task2,
-        task2 ~ hash ~> task4
-      ), version = 1)
-
-      val taskId3 = TaskId(3, 0)
-      val taskId4 = TaskId(4, 0)
-
-      clockService ! ChangeToNewDAG(dagWithStateTasks)
-      expectMsgType[ChangeToNewDAGSuccess]
-
-      clockService ! ReportCheckpointClock(taskId3, startClock)
-      clockService ! ReportCheckpointClock(taskId4, startClock)
-      clockService ! GetStartClock
-      expectMsg(StartClock(startClock))
-
-      clockService ! ReportCheckpointClock(taskId3, 200L)
-      clockService ! ReportCheckpointClock(taskId4, 300L)
-      clockService ! GetStartClock
-      expectMsg(StartClock(startClock))
-
-      clockService ! ReportCheckpointClock(taskId3, 300L)
-      clockService ! GetStartClock
-      expectMsg(StartClock(300L))
-    }
-  }
-
-  "ProcessorClock" should {
-    "maintain the min clock of current processor" in {
-      val processorId = 0
-      val parallism = 3
-      val clock = new ProcessorClock(processorId, LifeTime.Immortal, parallism)
-      clock.init(100L)
-      clock.updateMinClock(0, 101)
-      assert(clock.min == 100L)
-
-      clock.updateMinClock(1, 102)
-      assert(clock.min == 100L)
-
-      clock.updateMinClock(2, 103)
-      assert(clock.min == 101L)
-    }
-  }
-
-  "HealthChecker" should {
-    "report stalling if the clock is not advancing" in {
-      val healthChecker = new HealthChecker(stallingThresholdSeconds = 1)
-      val source = ProcessorDescription(id = 0, taskClass = null, parallelism = 1)
-      val sourceClock = new ProcessorClock(0, LifeTime.Immortal, 1)
-      sourceClock.init(0L)
-      val sink = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
-      val sinkClock = new ProcessorClock(1, LifeTime.Immortal, 1)
-      sinkClock.init(0L)
-      val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
-      graph.addVertex(source)
-      graph.addVertex(sink)
-      graph.addEdge(source, PartitionerDescription(null), sink)
-      val dag = DAG(graph)
-      val clocks = Map(
-        0 -> sourceClock,
-        1 -> sinkClock
-      )
-
-      sourceClock.updateMinClock(0, 100L)
-      sinkClock.updateMinClock(0, 100L)
-
-      // Clock advances from 0 to 100, there is no stalling.
-      healthChecker.check(currentMinClock = 100, clocks, dag, 200)
-      healthChecker.getReport.stallingTasks shouldBe List.empty[TaskId]
-
-      // Clock not advancing.
-      // Pasted time exceed the stalling threshold, report stalling
-      healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
-
-      // The source task is stalling the clock
-      healthChecker.getReport.stallingTasks shouldBe List(TaskId(0, 0))
-
-      // Advance the source clock
-      sourceClock.updateMinClock(0, 101L)
-      healthChecker.check(currentMinClock = 100, clocks, dag, 1300)
-      // The sink task is stalling the clock
-      healthChecker.getReport.stallingTasks shouldBe List(TaskId(1, 0))
-    }
-  }
-}
-
-object ClockServiceSpec {
-
-  class Store extends AppDataStore {
-
-    private var map = Map.empty[String, Any]
-
-    def put(key: String, value: Any): Future[Any] = {
-      map = map + (key -> value)
-      Promise.successful(value).future
-    }
-
-    def get(key: String): Future[Any] = {
-      Promise.successful(map.get(key).getOrElse(null)).future
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
deleted file mode 100644
index fb633f9..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
-import io.gearpump.streaming.task.{Subscriber, TaskActor}
-import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
-
-  val hash = Partitioner[HashPartitioner]
-  val task1 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1)
-  val task2 = ProcessorDescription(id = 2, taskClass = classOf[TaskActor].getName, parallelism = 1)
-  val graph = Graph(task1 ~ hash ~> task2)
-  val dag = DAG(graph)
-  implicit var system: ActorSystem = null
-  val appId = 0
-  lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph)
-
-  "DagManager" should {
-    import io.gearpump.streaming.appmaster.ClockServiceSpec.Store
-    "maintain the dags properly" in {
-      val store = new Store
-
-      val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag))))
-      val client = TestProbe()
-      client.send(dagManager, GetLatestDAG)
-      client.expectMsg(LatestDAG(dag))
-
-      client.send(dagManager, GetTaskLaunchData(dag.version, task1.id, null))
-      val task1LaunchData = TaskLaunchData(task1, Subscriber.of(task1.id, dag))
-      client.expectMsg(task1LaunchData)
-
-      val task2LaunchData = TaskLaunchData(task2, Subscriber.of(task2.id, dag))
-      client.send(dagManager, GetTaskLaunchData(dag.version, task2.id, null))
-      client.expectMsg(task2LaunchData)
-
-      val watcher = TestProbe()
-      client.send(dagManager, WatchChange(watcher.ref))
-      val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue))
-
-      client.send(dagManager, ReplaceProcessor(task2.id, task3))
-      client.expectMsg(DAGOperationSuccess)
-
-      client.send(dagManager, GetLatestDAG)
-      val newDag = client.expectMsgPF() {
-        case LatestDAG(dag) => dag
-      }
-      assert(newDag.processors.contains(task3.id))
-      watcher.expectMsgType[LatestDAG]
-
-      val task4 = task3.copy(id = 4)
-      client.send(dagManager, ReplaceProcessor(task3.id, task4))
-      client.expectMsgType[DAGOperationFailed]
-
-      client.send(dagManager, NewDAGDeployed(newDag.version))
-      client.send(dagManager, ReplaceProcessor(task3.id, task4))
-      client.expectMsg(DAGOperationSuccess)
-    }
-
-    "retrieve last stored dag properly" in {
-      val store = new Store
-      val newGraph = Graph(task1 ~ hash ~> task2 ~> task2)
-      val newDag = DAG(newGraph)
-      store.put(StreamApplication.DAG, newDag)
-      val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag))))
-      val client = TestProbe()
-      client.send(dagManager, GetLatestDAG)
-      client.expectMsg(LatestDAG(newDag))
-    }
-  }
-
-  override def afterAll {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  override def beforeAll {
-    this.system = ActorSystem("DagManagerSpec", TestUtil.DEFAULT_CONFIG)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
deleted file mode 100644
index a57a1ae..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest._
-
-import io.gearpump.TestProbeUtil
-import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
-import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.jarstore.FilePath
-import io.gearpump.streaming.ExecutorId
-import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
-import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, _}
-import io.gearpump.streaming.appmaster.ExecutorManagerSpec.StartExecutorActorPlease
-import io.gearpump.util.ActorSystemBooter.BindLifeCycle
-import io.gearpump.util.LogUtil
-
-class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  implicit var system: ActorSystem = null
-
-  private val LOG = LogUtil.getLogger(getClass)
-  private val appId = 0
-  private val resource = Resource(10)
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  private def startExecutorSystems = {
-    val master = TestProbe()
-    val taskManager = TestProbe()
-    val executor = TestProbe()
-    val userConfig = UserConfig.empty
-
-    val username = "user"
-    val appName = "app"
-    val appJar = Some(AppJar("for_test", FilePath("path")))
-
-    val appMasterContext = AppMasterContext(appId, username, null, null, appJar, master.ref, null)
-
-    val executorFactory = (_: ExecutorContext, _: UserConfig, _: Address, _: ExecutorId) => {
-      executor.ref ! StartExecutorActorPlease
-      TestProbeUtil.toProps(executor)
-    }
-    val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext,
-      executorFactory, ConfigFactory.empty, appName)))
-
-    taskManager.send(executorManager, SetTaskManager(taskManager.ref))
-    val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified))
-
-    // Starts executors
-    taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get))
-
-    // Asks master to start executor systems
-    import scala.concurrent.duration._
-    val startExecutorSystem = master.receiveOne(5.seconds).asInstanceOf[StartExecutorSystems]
-    assert(startExecutorSystem.resources == resourceRequest)
-    import startExecutorSystem.executorSystemConfig.{classPath, executorAkkaConfig, jar, jvmArguments, username => returnedUserName}
-    assert(startExecutorSystem.resources == resourceRequest)
-
-    assert(classPath.length == 0)
-    assert(jvmArguments.length == 0)
-    assert(jar == appJar)
-    assert(returnedUserName == username)
-    assert(executorAkkaConfig.isEmpty)
-
-    (master, executor, taskManager, executorManager)
-  }
-
-  it should "report timeout to taskManager" in {
-    import io.gearpump.streaming.appmaster.ExecutorManager._
-    val (master, executor, taskManager, executorManager) = startExecutorSystems
-    master.reply(StartExecutorSystemTimeout)
-    taskManager.expectMsg(StartExecutorsTimeOut)
-  }
-
-  it should "start executor actor correctly" in {
-    val (master, executor, taskManager, executorManager) = startExecutorSystems
-    val executorSystemDaemon = TestProbe()
-    val worker = TestProbe()
-    val workerId = WorkerId(0, 0L)
-    val workerInfo = WorkerInfo(workerId, worker.ref)
-    val executorSystem = ExecutorSystem(0, null, executorSystemDaemon.ref,
-      resource, workerInfo)
-    master.reply(ExecutorSystemStarted(executorSystem, None))
-    import scala.concurrent.duration._
-    val bindLifeWith = executorSystemDaemon.receiveOne(3.seconds).asInstanceOf[BindLifeCycle]
-    val proxyExecutor = bindLifeWith.actor
-    executor.expectMsg(StartExecutorActorPlease)
-
-    val executorId = 0
-
-    // Registers executor
-    executor.send(executorManager, RegisterExecutor(proxyExecutor, executorId,
-      resource, workerInfo))
-    taskManager.expectMsgType[ExecutorStarted]
-
-    // Broadcasts message to childs
-    taskManager.send(executorManager, BroadCast("broadcast"))
-    executor.expectMsg("broadcast")
-
-    // Unicast
-    taskManager.send(executorManager, UniCast(executorId, "unicast"))
-    executor.expectMsg("unicast")
-
-    // Updates executor resource status
-    val usedResource = Resource(5)
-    executorManager ! ExecutorResourceUsageSummary(Map(executorId -> usedResource))
-    worker.expectMsg(ChangeExecutorResource(appId, executorId, resource - usedResource))
-
-    // Watches for executor termination
-    system.stop(executor.ref)
-    LOG.info("Shutting down executor, and wait taskManager to get notified")
-    taskManager.expectMsg(ExecutorStopped(executorId))
-  }
-}
-
-object ExecutorManagerSpec {
-  case object StartExecutorActorPlease
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
deleted file mode 100644
index 9d4432a..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorRestartPolicySpec.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.duration._
-
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.streaming.executor.ExecutorRestartPolicy
-import io.gearpump.streaming.task.TaskId
-
-class ExecutorRestartPolicySpec extends WordSpec with Matchers {
-
-  "ExecutorRestartPolicy" should {
-    "decide whether to restart the executor" in {
-      val executorId1 = 1
-      val executorId2 = 2
-      val taskId = TaskId(0, 0)
-      val executorSupervisor = new ExecutorRestartPolicy(
-        maxNrOfRetries = 3, withinTimeRange = 1.seconds)
-      executorSupervisor.addTaskToExecutor(executorId1, taskId)
-      assert(executorSupervisor.allowRestartExecutor(executorId1))
-      assert(executorSupervisor.allowRestartExecutor(executorId1))
-      executorSupervisor.addTaskToExecutor(executorId2, taskId)
-      assert(executorSupervisor.allowRestartExecutor(executorId2))
-      assert(!executorSupervisor.allowRestartExecutor(executorId2))
-      Thread.sleep(1000)
-      assert(executorSupervisor.allowRestartExecutor(executorId2))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
deleted file mode 100644
index 6cd70d9..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/HistoryMetricsServiceSpec.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Await
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.ClientToMaster.QueryHistoryMetrics
-import io.gearpump.cluster.MasterToClient.HistoryMetrics
-import io.gearpump.cluster.TestUtil
-import io.gearpump.metrics.Metrics.{Counter, Histogram, Meter}
-import io.gearpump.util.HistoryMetricsService
-import io.gearpump.util.HistoryMetricsService._
-
-class HistoryMetricsServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
-  val count = 2
-  val intervalMs = 10
-
-  val config = HistoryMetricsConfig(
-    retainHistoryDataHours = 72,
-    retainHistoryDataIntervalMs = 3600 * 1000,
-    retainRecentDataSeconds = 300,
-    retainRecentDataIntervalMs = 15 * 1000)
-
-  "SingleValueMetricsStore" should "retain metrics and expire old value" in {
-
-    val store = new SingleValueMetricsStore(count, intervalMs)
-
-    var now = 0L
-    // Only 1 data point will be kept in @intervalMs
-    store.add(Counter("count", 1), now)
-    store.add(Counter("count", 2), now)
-
-    now = now + intervalMs + 1
-
-    // Only 1 data point will be kept in @intervalMs
-    store.add(Counter("count", 3), now)
-    store.add(Counter("count", 4), now)
-
-    now = now + intervalMs + 1
-
-    // Only 1 data point will be kept in @intervalMs
-    // expire oldest data point, because we only keep @count records
-    store.add(Counter("count", 5), now)
-    store.add(Counter("count", 6), now)
-
-    val result = store.read
-    assert(result.size == count)
-
-    // The oldest value is expired
-    assert(result.head.value.asInstanceOf[Counter].value == 3L)
-
-    // The newest value is inserted
-    assert(result.last.value.asInstanceOf[Counter].value == 5L)
-  }
-
-  val meterTemplate = Meter("meter", 0, 0, 0, "s")
-
-  "HistogramMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
-    val store = new HistogramMetricsStore(config)
-    val a = Histogram(null, 100, 0, 0, 0, 0, 0)
-    val b = Histogram(null, 200, 0, 0, 0, 0, 0)
-    val c = Histogram(null, 50, 0, 0, 0, 0, 0)
-
-    store.add(a)
-    store.add(b)
-    store.add(c)
-
-    assert(store.readLatest.map(_.value) == List(c))
-    assert(store.readRecent.map(_.value) == List(a))
-    assert(store.readHistory.map(_.value) == List(a))
-  }
-
-  "MeterMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
-    val store = new MeterMetricsStore(config)
-
-    val a = Meter(null, 1, 100, 0, null)
-    val b = Meter(null, 1, 200, 0, null)
-    val c = Meter(null, 1, 50, 0, null)
-
-    store.add(a)
-    store.add(b)
-    store.add(c)
-
-    assert(store.readLatest.map(_.value) == List(c))
-    assert(store.readRecent.map(_.value) == List(a))
-    assert(store.readHistory.map(_.value) == List(a))
-  }
-
-  "CounterMetricsStore" should "retain corse-grain history and fine-grain recent data" in {
-    val store = new CounterMetricsStore(config)
-    val a = Counter(null, 50)
-    val b = Counter(null, 100)
-    val c = Counter(null, 150)
-
-    store.add(a)
-    store.add(b)
-    store.add(c)
-
-    assert(store.readLatest.map(_.value) == List(c))
-    assert(store.readRecent.map(_.value) == List(a))
-    assert(store.readHistory.map(_.value) == List(a))
-  }
-
-  "HistoryMetricsService" should
-    "retain lastest metrics data and allow user to query metrics by path" in {
-    implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-    val appId = 0
-    val service = system.actorOf(Props(new HistoryMetricsService("app0", config)))
-    service ! Counter("metric.counter", 0)
-    service ! Meter("metric.meter", 0, 0, 0, null)
-    service ! Histogram("metric.histogram", 0, 0, 0, 0, 0, 0)
-
-    val client = TestProbe()
-
-    // Filters metrics with path "metric.counter"
-    client.send(service, QueryHistoryMetrics("metric.counter"))
-    import scala.concurrent.duration._
-    client.expectMsgPF(3.seconds) {
-      case history: HistoryMetrics =>
-        assert(history.path == "metric.counter")
-        val metricList = history.metrics
-        metricList.foreach(metricItem =>
-          assert(metricItem.value.isInstanceOf[Counter])
-        )
-    }
-
-    // Filters metrics with path "metric.meter"
-    client.send(service, QueryHistoryMetrics("metric.meter"))
-    client.expectMsgPF(3.seconds) {
-      case history: HistoryMetrics =>
-        assert(history.path == "metric.meter")
-        val metricList = history.metrics
-        metricList.foreach(metricItem =>
-          assert(metricItem.value.isInstanceOf[Meter])
-        )
-    }
-
-    // Filters metrics with path "metric.histogram"
-    client.send(service, QueryHistoryMetrics("metric.histogram"))
-    client.expectMsgPF(3.seconds) {
-      case history: HistoryMetrics =>
-        assert(history.path == "metric.histogram")
-        val metricList = history.metrics
-        metricList.foreach(metricItem =>
-          assert(metricItem.value.isInstanceOf[Histogram])
-        )
-    }
-
-    // Filters metrics with path prefix "metric", all metrics which can
-    // match the path prefix will be retained.
-    client.send(service, QueryHistoryMetrics("metric"))
-    client.expectMsgPF(3.seconds) {
-      case history: HistoryMetrics =>
-        val metricList = history.metrics
-
-        var counterFound = false
-        var meterFound = false
-        var histogramFound = false
-
-        metricList.foreach(metricItem =>
-          metricItem.value match {
-            case v: Counter => counterFound = true
-            case v: Meter => meterFound = true
-            case v: Histogram => histogramFound = true
-            case _ => // Skip
-          }
-        )
-
-        // All kinds of metric type are reserved.
-        assert(counterFound && meterFound && histogramFound)
-    }
-
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
deleted file mode 100644
index b391196..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorSystem
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppJar, TestUtil}
-import io.gearpump.jarstore.FilePath
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.streaming.{DAG, ProcessorDescription, _}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class JarSchedulerSpec extends WordSpec with Matchers {
-  val mockJar1 = AppJar("jar1", FilePath("path"))
-  val mockJar2 = AppJar("jar2", FilePath("path"))
-  val task1 = ProcessorDescription(id = 0, taskClass = classOf[TestTask1].getName, parallelism = 1,
-    jar = mockJar1)
-  val task2 = ProcessorDescription(id = 1, taskClass = classOf[TestTask2].getName, parallelism = 1,
-    jar = mockJar1)
-  val task3 = ProcessorDescription(id = 2, taskClass = classOf[TestTask2].getName, parallelism = 2,
-    jar = mockJar2)
-  val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
-
-  import scala.concurrent.duration._
-
-  "JarScheduler" should {
-    "schedule tasks depends on app jar" in {
-      val system = ActorSystem("JarSchedulerSpec")
-      implicit val dispatcher = system.dispatcher
-      val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
-      manager.setDag(dag, Future {
-        0L
-      })
-      val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified))
-      val result = Await.result(manager.getResourceRequestDetails(), 15.seconds)
-      assert(result.length == 1)
-      assert(result.head.jar == mockJar1)
-      assert(result.head.requests.deep == requests.deep)
-
-      val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0,
-        Resource(2)), 15.seconds)
-      assert(tasks.contains(TaskId(0, 0)))
-      assert(tasks.contains(TaskId(1, 0)))
-
-      val newDag = replaceDAG(dag, 1, task3, 1)
-
-      manager.setDag(newDag, Future {
-        0
-      })
-      val requestDetails = Await.result(manager.getResourceRequestDetails().
-        map(_.sortBy(_.jar.name)), 15.seconds)
-      assert(requestDetails.length == 2)
-      assert(requestDetails.last.jar == mockJar2)
-      assert(requestDetails.last.requests.deep == requests.deep)
-
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-
-  def replaceDAG(
-      dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
-    : DAG = {
-    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
-      newProcessor.life.birth)
-    val newProcessorMap = dag.processors ++
-      Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
-        newProcessor.id -> newProcessor)
-    val newGraph = dag.graph.subGraph(oldProcessorId).
-      replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
-    new DAG(newVersion, newProcessorMap, newGraph)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
deleted file mode 100644
index 2e07def..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.streaming.appmaster.TaskLocator.Localities
-import io.gearpump.streaming.task.TaskId
-
-class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  it should "serialize/deserialize correctly" in {
-    val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1, 2))))
-    Localities.toJson(localities)
-
-    localities.localities.mapValues(_.toList) shouldBe
-      Localities.fromJson(Localities.toJson(localities)).localities.mapValues(_.toList)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
deleted file mode 100644
index 8153fce..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppJar, TestUtil, UserConfig}
-import io.gearpump.jarstore.FilePath
-import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
-import io.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered}
-import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask
-import io.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut
-import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess}
-import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
-import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorResourceUsageSummary, SetTaskManager, StartExecutors, _}
-import io.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetail
-import io.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2}
-import io.gearpump.streaming.executor.Executor.RestartTasks
-import io.gearpump.streaming.task.{StartTime, TaskContext, _}
-import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-import io.gearpump.{Message, TimeStamp}
-
-class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
-  implicit var system: ActorSystem = null
-
-  val task1Class = classOf[Task1].getName
-  val task2Class = classOf[Task2].getName
-
-  val mockJar = AppJar("jar_for_test", FilePath("path"))
-  val task1 = ProcessorDescription(id = 0, taskClass = task1Class, parallelism = 1, jar = mockJar)
-  val task2 = ProcessorDescription(id = 1, taskClass = task2Class, parallelism = 1, jar = mockJar)
-
-  val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
-  val dagVersion = 0
-
-  val task1LaunchData = TaskLaunchData(task1, Subscriber.of(processorId = 0, dag))
-  val task2LaunchData = TaskLaunchData(task2, Subscriber.of(processorId = 1, dag))
-
-  val appId = 0
-
-  val resource = Resource(2)
-  val workerId = WorkerId(0, 0L)
-  val executorId = 0
-
-  override def beforeEach(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterEach(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "recover by requesting new executors when executor stopped unexpectedly" in {
-    val env = bootUp
-    import env._
-    implicit val dispatcher = system.dispatcher
-
-    val resourceRequest = Array(ResourceRequest(resource, workerId))
-    when(scheduler.executorFailed(executorId)).thenReturn(Future {
-      Some(ResourceRequestDetail(mockJar,
-        resourceRequest))
-    })
-
-    taskManager ! ExecutorStopped(executorId)
-
-    // When one executor stop, it will also trigger the recovery by restart
-    // existing executors
-    executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
-
-    // Asks for new executors
-    val returned = executorManager.receiveN(1).head.asInstanceOf[StartExecutors]
-    assert(returned.resources.deep == resourceRequest.deep)
-    executorManager.reply(StartExecutorsTimeOut)
-
-    // TaskManager cannot handle the TimeOut error itself, escalate to appmaster.
-    appMaster.expectMsg(AllocateResourceTimeOut)
-  }
-
-  it should "recover by restarting existing executors when message loss happen" in {
-    val env = bootUp
-    import env._
-
-    taskManager ! ReplayFromTimestampWindowTrailingEdge(appId)
-
-    // Restart the executors so that we can replay from minClock
-    executorManager.expectMsg(BroadCast(RestartTasks(dagVersion)))
-  }
-
-  import io.gearpump.streaming.appmaster.TaskManager.TaskChangeRegistry
-  "TaskChangeRegistry" should "track all modified task registration" in {
-    val tasks = List(TaskId(0, 0), TaskId(0, 1))
-    val registry = new TaskChangeRegistry(tasks)
-    registry.taskChanged(TaskId(0, 0))
-    registry.taskChanged(TaskId(0, 1))
-    assert(registry.allTaskChanged)
-  }
-
-  "DAGDiff" should "track all the DAG migration impact" in {
-    val defaultEdge = PartitionerDescription(null)
-    val a = ProcessorDescription(id = 1, taskClass = null, parallelism = 1)
-    val b = ProcessorDescription(id = 2, taskClass = null, parallelism = 1)
-    val c = ProcessorDescription(id = 3, taskClass = null, parallelism = 1)
-    val left = Graph(a ~ defaultEdge ~> b, a ~ defaultEdge ~> c)
-
-    val d = ProcessorDescription(id = 4, taskClass = null, parallelism = 1)
-    val right = left.copy
-    right.addVertex(d)
-    right.addEdge(c, defaultEdge, d)
-    val e = a.copy(life = LifeTime(0, 0))
-    right.replaceVertex(a, e)
-
-    val diff = TaskManager.migrate(DAG(left), DAG(right, version = 1))
-    diff.addedProcessors shouldBe List(d.id)
-
-    diff.modifiedProcessors shouldBe List(a.id)
-
-    diff.impactedUpstream shouldBe List(c.id)
-  }
-
-  private def bootUp: Env = {
-
-    implicit val dispatcher = system.dispatcher
-
-    val executorManager = TestProbe()
-    val clockService = TestProbe()
-    val appMaster = TestProbe()
-    val executor = TestProbe()
-
-    val scheduler = mock(classOf[JarScheduler])
-
-    val dagManager = TestProbe()
-
-    val taskManager = system.actorOf(
-      Props(new TaskManager(appId, dagManager.ref, scheduler, executorManager.ref, clockService.ref,
-        appMaster.ref, "appName")))
-
-    dagManager.expectMsgType[WatchChange]
-    executorManager.expectMsgType[SetTaskManager]
-
-    // Step1: first transition from Unitialized to ApplicationReady
-    executorManager.expectMsgType[ExecutorResourceUsageSummary]
-    dagManager.expectMsgType[NewDAGDeployed]
-
-    // Step2: Get Additional Resource Request
-    when(scheduler.getResourceRequestDetails())
-      .thenReturn(Future {
-        Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource,
-          WorkerId.unspecified))))
-      })
-
-    // Step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
-    dagManager.expectMsg(GetLatestDAG)
-    dagManager.reply(LatestDAG(dag))
-
-    // Step4: Start remote Executors.
-    // received Broadcast
-    executorManager.expectMsg(BroadCast(StartDynamicDag(dag.version)))
-    executorManager.expectMsgType[StartExecutors]
-
-    when(scheduler.scheduleTask(mockJar, workerId, executorId, resource))
-      .thenReturn(Future(List(TaskId(0, 0), TaskId(1, 0))))
-
-    // Step5: Executor is started.
-    executorManager.reply(ExecutorStarted(executorId, resource, workerId, Some(mockJar)))
-
-    // Step6: Prepare to start Task. First GetTaskLaunchData.
-    val taskLaunchData: PartialFunction[Any, TaskLaunchData] = {
-      case GetTaskLaunchData(_, 0, executorStarted) =>
-        task1LaunchData.copy(context = executorStarted)
-      case GetTaskLaunchData(_, 1, executorStarted) =>
-        task2LaunchData.copy(context = executorStarted)
-    }
-
-    val launchData1 = dagManager.expectMsgPF()(taskLaunchData)
-    dagManager.reply(launchData1)
-
-    val launchData2 = dagManager.expectMsgPF()(taskLaunchData)
-    dagManager.reply(launchData2)
-
-    // Step7: Launch Task
-    val launchTaskMatch: PartialFunction[Any, RegisterTask] = {
-      case UniCast(executorId, launch: LaunchTasks) =>
-        RegisterTask(launch.taskId.head, executorId, HostPort("127.0.0.1:3000"))
-    }
-
-    // Taskmanager should return the latest start clock to task(0,0)
-    clockService.expectMsg(GetStartClock)
-    clockService.reply(StartClock(0))
-
-    // Step8: Task is started. registerTask.
-    val registerTask1 = executorManager.expectMsgPF()(launchTaskMatch)
-    taskManager.tell(registerTask1, executor.ref)
-    executor.expectMsgType[TaskRegistered]
-
-    val registerTask2 = executorManager.expectMsgPF()(launchTaskMatch)
-    taskManager.tell(registerTask2, executor.ref)
-    executor.expectMsgType[TaskRegistered]
-
-    // Step9: start broadcasting TaskLocations.
-    import scala.concurrent.duration._
-    assert(executorManager.expectMsgPF(5.seconds) {
-      case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[TaskLocationsReady]
-    })
-
-    // Step10: Executor confirm it has received TaskLocationsReceived(version, executorId)
-    taskManager.tell(TaskLocationsReceived(dag.version, executorId), executor.ref)
-
-    // Step11: Tell ClockService to update DAG.
-    clockService.expectMsgType[ChangeToNewDAG]
-    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp]))
-
-    // Step12: start all tasks
-    import scala.concurrent.duration._
-    assert(executorManager.expectMsgPF(5.seconds) {
-      case BroadCast(startAllTasks) => startAllTasks.isInstanceOf[StartAllTasks]
-    })
-
-    // Step13, Tell executor Manager the updated usage status of executors.
-    executorManager.expectMsgType[ExecutorResourceUsageSummary]
-
-    // Step14: transition from DynamicDAG to ApplicationReady
-    Env(executorManager, clockService, appMaster, executor, taskManager, scheduler)
-  }
-}
-
-object TaskManagerSpec {
-  case class Env(
-      executorManager: TestProbe,
-      clockService: TestProbe,
-      appMaster: TestProbe,
-      executor: TestProbe,
-      taskManager: ActorRef,
-      scheduler: JarScheduler)
-
-  class Task1(taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = {}
-    override def onNext(msg: Message): Unit = {}
-  }
-
-  class Task2(taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = {}
-    override def onNext(msg: Message): Unit = {}
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
deleted file mode 100644
index e8417ea..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskRegistrySpec.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, Reject, TaskLocation, TaskLocations}
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.transport.HostPort
-class TaskRegistrySpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
-  it should "maintain registered tasks" in {
-    val task0 = TaskId(0, 0)
-    val task1 = TaskId(0, 1)
-    val task2 = TaskId(0, 2)
-
-    val register = new TaskRegistry(expectedTasks = List(task0, task1, task2))
-    val host1 = HostPort("127.0.0.1:3000")
-    val host2 = HostPort("127.0.0.1:3001")
-
-    val executorId = 0
-    assert(Accept == register.registerTask(task0, TaskLocation(executorId, host1)))
-    assert(Accept == register.registerTask(task1, TaskLocation(executorId, host1)))
-    assert(Accept == register.registerTask(task2, TaskLocation(executorId, host2)))
-
-    assert(Reject == register.registerTask(TaskId(100, 0), TaskLocation(executorId, host2)))
-
-    assert(register.isAllTasksRegistered)
-    val TaskLocations(taskLocations) = register.getTaskLocations
-    val tasksOnHost1 = taskLocations.get(host1).get
-    val tasksOnHost2 = taskLocations.get(host2).get
-    assert(tasksOnHost1.contains(task0))
-    assert(tasksOnHost1.contains(task1))
-    assert(tasksOnHost2.contains(task2))
-
-    assert(register.getExecutorId(task0) == Some(executorId))
-    assert(register.isTaskRegisteredForExecutor(executorId))
-
-    register.processorExecutors(0) shouldBe Map(
-      executorId -> List(task0, task1, task2)
-    )
-
-    register.usedResource.resources shouldBe Map(
-      executorId -> Resource(3)
-    )
-  }
-}