You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:57 UTC

[61/82] [abbrv] incubator-flink git commit: Fixed JobManagerITCase to properly wait for task managers to deregister their tasks. Replaced the scheduler's execution service with akka's futures. Introduced TestStreamEnvironment to use ForkableFlinkMiniClus

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 6689f93..0e28ab6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -25,9 +25,8 @@ import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph,
 AbstractJobVertex}
 import Tasks._
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManagerMessages}
-import TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound,
-ResponseExecutionGraph, RequestExecutionGraph}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.{TestingUtils}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -68,461 +67,387 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
           expectNoMsg()
         }
 
-        val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-          RequestExecutionGraph(jobGraph.getJobID)) match {
-          case ExecutionGraphFound(_, executionGraph) => executionGraph
-          case ExecutionGraphNotFound(jobID) => fail(s"The execution graph for job ID ${jobID} " +
-            s"was not retrievable.")
-        }
-
-        executionGraph.getRegisteredExecutions.size should equal(0)
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
       } finally {
         cluster.stop()
       }
     }
 
-        "support immediate scheduling of a single vertex" in {
-          val num_tasks = 133
-          val vertex = new AbstractJobVertex("Test Vertex")
-          vertex.setParallelism(num_tasks)
-          vertex.setInvokableClass(classOf[NoOpInvokable])
-
-          val jobGraph = new JobGraph("Test Job", vertex)
-
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+    "support immediate scheduling of a single vertex" in {
+      val num_tasks = 133
+      val vertex = new AbstractJobVertex("Test Vertex")
+      vertex.setParallelism(num_tasks)
+      vertex.setInvokableClass(classOf[NoOpInvokable])
 
-          try {
-            val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
-            availableSlots should equal(num_tasks)
+      val jobGraph = new JobGraph("Test Job", vertex)
 
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              val result = expectMsgType[JobResultSuccess]
+      try {
+        val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
+        availableSlots should equal(num_tasks)
 
-              result.jobID should equal(jobGraph.getJobID)
-            }
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          val result = expectMsgType[JobResultSuccess]
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          result.jobID should equal(jobGraph.getJobID)
         }
 
-        "support queued scheduling of a single vertex" in {
-          val num_tasks = 111
-
-          val vertex = new AbstractJobVertex("Test Vertex")
-          vertex.setParallelism(num_tasks)
-          vertex.setInvokableClass(classOf[NoOpInvokable])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          val jobGraph = new JobGraph("Test job", vertex)
-          jobGraph.setAllowQueuedScheduling(true)
+    "support queued scheduling of a single vertex" in {
+      val num_tasks = 111
 
-          val cluster = TestingUtils.startTestingCluster(10)
-          val jm = cluster.getJobManager
+      val vertex = new AbstractJobVertex("Test Vertex")
+      vertex.setParallelism(num_tasks)
+      vertex.setInvokableClass(classOf[NoOpInvokable])
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Test job", vertex)
+      jobGraph.setAllowQueuedScheduling(true)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
+      val cluster = TestingUtils.startTestingCluster(10)
+      val jm = cluster.getJobManager
 
-              val result = expectMsgType[JobResultSuccess]
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-              result.jobID should equal(jobGraph.getJobID)
-            }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+          val result = expectMsgType[JobResultSuccess]
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          result.jobID should equal(jobGraph.getJobID)
         }
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-        "support forward jobs" in {
-          val num_tasks = 31
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[Receiver])
-
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "support forward jobs" in {
+      val num_tasks = 31
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
+      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+      val jm = cluster.getJobManager
 
-              val result = expectMsgType[JobResultSuccess]
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-              result.jobID should equal(jobGraph.getJobID)
-            }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+          val result = expectMsgType[JobResultSuccess]
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          result.jobID should equal(jobGraph.getJobID)
         }
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-        "support bipartite job" in {
-          val num_tasks = 31
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[AgnosticReceiver])
-
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+    "support bipartite job" in {
+      val num_tasks = 31
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[AgnosticReceiver])
 
-          val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultSuccess]
-            }
+      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultSuccess]
         }
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-        "support two input job failing edge mismatch" in {
-          val num_tasks = 11
-          val sender1 = new AbstractJobVertex("Sender1")
-          val sender2 = new AbstractJobVertex("Sender2")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender1.setInvokableClass(classOf[Sender])
-          sender2.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[AgnosticReceiver])
-
-          sender1.setParallelism(num_tasks)
-          sender2.setParallelism(2 * num_tasks)
-          receiver.setParallelism(3 * num_tasks)
+    "support two input job failing edge mismatch" in {
+      val num_tasks = 1
+      val sender1 = new AbstractJobVertex("Sender1")
+      val sender2 = new AbstractJobVertex("Sender2")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-          receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+      sender1.setInvokableClass(classOf[Sender])
+      sender2.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[AgnosticReceiver])
 
-          val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      sender1.setParallelism(num_tasks)
+      sender2.setParallelism(2 * num_tasks)
+      receiver.setParallelism(3 * num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
 
-        "support two input job" in {
-          val num_tasks = 11
-          val sender1 = new AbstractJobVertex("Sender1")
-          val sender2 = new AbstractJobVertex("Sender2")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender1.setInvokableClass(classOf[Sender])
-          sender2.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender1.setParallelism(num_tasks)
-          sender2.setParallelism(2 * num_tasks)
-          receiver.setParallelism(3 * num_tasks)
+    "support two input job" in {
+      val num_tasks = 11
+      val sender1 = new AbstractJobVertex("Sender1")
+      val sender2 = new AbstractJobVertex("Sender2")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-          receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+      sender1.setInvokableClass(classOf[Sender])
+      sender2.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
 
-          val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      sender1.setParallelism(num_tasks)
+      sender2.setParallelism(2 * num_tasks)
+      receiver.setParallelism(3 * num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
+      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
-              expectMsgType[JobResultSuccess]
-            }
+      val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          expectMsgType[JobResultSuccess]
         }
 
-        "handle job with a failing sender vertex" in {
-          val num_tasks = 100
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[ExceptionSender])
-          receiver.setInvokableClass(classOf[Receiver])
-
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+    "handle job with a failing sender vertex" in {
+      val num_tasks = 100
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      sender.setInvokableClass(classOf[ExceptionSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
-            }
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
         }
 
-        "handle job with an occasionally failing sender vertex" in {
-          val num_tasks = 100
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[SometimesExceptionSender])
-          receiver.setInvokableClass(classOf[Receiver])
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
+        }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+    "handle job with an occasionally failing sender vertex" in {
+      val num_tasks = 100
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      sender.setInvokableClass(classOf[SometimesExceptionSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
-            }
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
         }
 
-        "handle job with a failing receiver vertex" in {
-          val num_tasks = 200
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
+        }
 
-          sender.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[ExceptionReceiver])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "handle job with a failing receiver vertex" in {
+      val num_tasks = 200
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[ExceptionReceiver])
 
-          val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+      val jm = cluster.getJobManager
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
 
-        "handle job with all vertices failing during instantiation" in {
-          val num_tasks = 200
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[InstantiationErrorSender])
-          receiver.setInvokableClass(classOf[Receiver])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "handle job with all vertices failing during instantiation" in {
+      val num_tasks = 200
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[InstantiationErrorSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
 
-        "handle job with some vertices failing during instantiation" in {
-          val num_tasks = 200
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
 
-          sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
-          receiver.setInvokableClass(classOf[Receiver])
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "handle job with some vertices failing during instantiation" in {
+      val num_tasks = 200
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
+
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9961ada..5a51265 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -44,7 +44,7 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
 
   override def startTaskManager(index: Int)(implicit system: ActorSystem) = {
     val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
-      TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, configuration, true)
+      TaskManager.parseConfiguration(HOSTNAME, configuration, true)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
       networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 9782b72..67a8934 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,15 +19,18 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorRef, Props}
+import akka.pattern.{ask, pipe}
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
-WaitForAllVerticesToBeRunning, ExecutionGraphFound, RequestExecutionGraph}
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged,
+ExecutionStateChanged}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved
 
 import scala.collection.convert.WrapAsScala
+import scala.concurrent.{Await, Future}
 
 
 trait TestingJobManager extends ActorLogMessages with WrapAsScala {
@@ -72,6 +75,22 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
       if(cleanup){
         waitForAllVerticesToBeRunning.remove(jobID)
       }
+    case NotifyWhenJobRemoved(jobID) => {
+      val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
+
+      val responses = tms.map{
+        tm =>
+          (tm ? NotifyWhenJobRemoved(jobID))(timeout).mapTo[Boolean]
+      }
+
+      import context.dispatcher
+      val f = Future.sequence(responses)
+
+      val t = Await.result(f, timeout)
+
+      sender() ! true
+//      Future.fold(responses)(true)(_ & _) pipeTo sender()
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 3b34955..7941226 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -37,4 +37,5 @@ object TestingJobManagerMessages {
   case class WaitForAllVerticesToBeRunning(jobID: JobID)
   case class AllVerticesRunning(jobID: JobID)
 
+  case class NotifyWhenJobRemoved(jobID: JobID)
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 5c6cca1..31a43cb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.{ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
+import scala.concurrent.duration._
 
 trait TestingTaskManager extends ActorLogMessages {
-  self: TaskManager =>
+  that: TaskManager =>
 
   val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
 
   abstract override def receiveWithLogMessages = {
     receiveTestMessages orElse super.receiveWithLogMessages
@@ -51,7 +55,32 @@ trait TestingTaskManager extends ActorLogMessages {
         case None =>
       }
     case RequestBroadcastVariablesWithReferences => {
-      sender() ! ResponseBroadcastVariablesWithReferences(bcVarManager.getNumberOfVariablesWithReferences)
+      sender() ! ResponseBroadcastVariablesWithReferences(
+        bcVarManager.getNumberOfVariablesWithReferences)
+    }
+    case NotifyWhenJobRemoved(jobID) => {
+      if(runningTasks.values.exists(_.getJobID == jobID)){
+        val set = waitForJobRemoval.getOrElse(jobID, Set())
+        waitForJobRemoval += (jobID -> (set + sender()))
+        import context.dispatcher
+        context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
+      }else{
+        waitForJobRemoval.get(jobID) match {
+          case Some(listeners) => (listeners + sender()) foreach (_ ! true)
+          case None => sender() ! true
+        }
+      }
+    }
+    case CheckIfJobRemoved(jobID) => {
+      if(runningTasks.values.forall(_.getJobID != jobID)){
+        waitForJobRemoval.get(jobID) match {
+          case Some(listeners) => listeners foreach (_ ! true)
+          case None =>
+        }
+      }else{
+        import context.dispatcher
+        context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index 24d7e5c..cb5282e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -19,10 +19,12 @@
 package org.apache.flink.runtime.testingUtils
 
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.Task
 
 object TestingTaskManagerMessages{
   case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
   case object RequestRunningTasks
   case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
     import collection.JavaConverters._
@@ -30,4 +32,6 @@ object TestingTaskManagerMessages{
   }
   case object RequestBroadcastVariablesWithReferences
   case class ResponseBroadcastVariablesWithReferences(number: Int)
+
+  case class CheckIfJobRemoved(jobID: JobID)
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index df0915e..dddef4a 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -208,7 +208,7 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
-			
+
 			<plugin>
 				<groupId>org.scalastyle</groupId>
 				<artifactId>scalastyle-maven-plugin</artifactId>
@@ -232,7 +232,7 @@ under the License.
 					<outputEncoding>UTF-8</outputEncoding>
 				</configuration>
 			</plugin>
-			
+
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index d28e9dd..dcab0b8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index 19c2f90..ce19a65 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index f25dd6c..8685cc5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 7e9e4e5..4f8f632 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 171db60..539f96c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 40071b7..5468637 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index f5ab3ba..06e40d8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index d77318e..22dafda 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -80,4 +80,138 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins>
+						<compilerPlugin>
+							<groupId>org.scalamacros</groupId>
+							<artifactId>paradise_${scala.version}</artifactId>
+							<version>${scala.macros.version}</version>
+						</compilerPlugin>
+					</compilerPlugins>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
+		</plugins>
+	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index b73f961..c30d976 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -46,7 +46,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
@@ -68,7 +67,7 @@ public abstract class AbstractTestBase {
 
 	protected final Configuration config;
 	
-	protected TestingCluster executor;
+	protected ForkableFlinkMiniCluster executor;
 
 	private final List<File> tempFiles;
 
@@ -97,16 +96,15 @@ public abstract class AbstractTestBase {
 	// --------------------------------------------------------------------------------------------
 	
 	public void startCluster() throws Exception {
-		Thread.sleep(250);
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 		config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
-		this.executor = new TestingCluster(config);
+		this.executor = new ForkableFlinkMiniCluster(config);
 	}
-	
+
 	public void stopCluster() throws Exception {
 		try {
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 83dd73b..03f60b3 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -31,7 +31,6 @@ import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.CollectionEnvironment;
@@ -194,12 +193,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	private static final class TestEnvironment extends ExecutionEnvironment {
 
-		private final FlinkMiniCluster executor;
+		private final ForkableFlinkMiniCluster executor;
 
 		private JobExecutionResult latestResult;
 		
 		
-		private TestEnvironment(FlinkMiniCluster executor, int degreeOfParallelism) {
+		private TestEnvironment(ForkableFlinkMiniCluster executor, int degreeOfParallelism) {
 			this.executor = executor;
 			setDegreeOfParallelism(degreeOfParallelism);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
new file mode 100644
index 0000000..f82a4a6
--- /dev/null
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import akka.actor.{Props, ActorSystem, ActorRef}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingTaskManager
+
+class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
+LocalFlinkMiniCluster(userConfiguration) {
+
+  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
+    val forNumberString = System.getProperty("forkNumber")
+
+    val forkNumber = try {
+      Integer.parseInt(forNumberString)
+    }catch{
+      case e: NumberFormatException => -1
+    }
+
+    val config = userConfiguration.clone()
+
+    if(forkNumber != -1){
+      val jobManagerRPC = 1024 + forkNumber*300
+      val taskManagerRPC = 1024 + forkNumber*300 + 100
+      val taskManagerData = 1024 + forkNumber*300 + 200
+
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
+
+    }
+
+    super.generateConfiguration(config)
+  }
+
+  override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
+      .DEFAULT_TASK_MANAGER_IPC_PORT)
+    val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
+      .DEFAULT_TASK_MANAGER_DATA_PORT)
+
+    if(rpcPort > 0){
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+    }
+
+    if(dataPort > 0){
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
+    }
+
+    val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) =
+      TaskManager.parseConfiguration(HOSTNAME, config, false)
+
+    system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig,
+      networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 6347cb5..303ee3d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Assert;
 
 import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public abstract class CancellingTestBase {
 
 	// --------------------------------------------------------------------------------------------
 	
-	protected LocalFlinkMiniCluster executor;
+	protected ForkableFlinkMiniCluster executor;
 
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 	
@@ -87,7 +88,7 @@ public abstract class CancellingTestBase {
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 
-		this.executor = new LocalFlinkMiniCluster(config);
+		this.executor = new ForkableFlinkMiniCluster(config);
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index 4bd3fc7..9046d2d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -24,8 +24,8 @@ import java.io.FileWriter;
 import org.apache.flink.client.RemoteExecutor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class PackagedProgramEndToEndITCase {
 
 	@Test
 	public void testEverything() {
-		LocalFlinkMiniCluster cluster = null;
+		ForkableFlinkMiniCluster cluster = null;
 
 		File points = null;
 		File clusters = null;
@@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-			cluster = new LocalFlinkMiniCluster(config);
+			cluster = new ForkableFlinkMiniCluster(config);
 
 			RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index e8b716b..06013b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.test.util;
 
 import akka.actor.ActorRef;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 
 import org.apache.flink.runtime.client.JobClient;
@@ -120,7 +119,7 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 		// reference to the timeout thread
 		private final Thread timeoutThread;
 		// cluster to submit the job to.
-		private final FlinkMiniCluster executor;
+		private final ForkableFlinkMiniCluster executor;
 		// job graph of the failing job (submitted first)
 		private final JobGraph failingJob;
 		// job graph of the working job (submitted after return from failing job)
@@ -129,8 +128,8 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 		private volatile Exception error;
 		
 
-		public SubmissionThread(Thread timeoutThread, FlinkMiniCluster executor, JobGraph failingJob,
-								JobGraph job) {
+		public SubmissionThread(Thread timeoutThread, ForkableFlinkMiniCluster executor, JobGraph
+				failingJob,	JobGraph job) {
 			this.timeoutThread = timeoutThread;
 			this.executor = executor;
 			this.failingJob = failingJob;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index a923af6..4024304 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index 60651d1..8ab41ec 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
index 1155d73..587bbf3 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,