You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:57 UTC
[61/82] [abbrv] incubator-flink git commit: Fixed JobManagerITCase to
properly wait for task managers to deregister their tasks. Replaced the
scheduler's execution service with akka's futures. Introduced
TestStreamEnvironment to use ForkableFlinkMiniClus
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 6689f93..0e28ab6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -25,9 +25,8 @@ import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph,
AbstractJobVertex}
import Tasks._
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManagerMessages}
-import TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound,
-ResponseExecutionGraph, RequestExecutionGraph}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.{TestingUtils}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -68,461 +67,387 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
expectNoMsg()
}
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, executionGraph) => executionGraph
- case ExecutionGraphNotFound(jobID) => fail(s"The execution graph for job ID ${jobID} " +
- s"was not retrievable.")
- }
-
- executionGraph.getRegisteredExecutions.size should equal(0)
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
} finally {
cluster.stop()
}
}
- "support immediate scheduling of a single vertex" in {
- val num_tasks = 133
- val vertex = new AbstractJobVertex("Test Vertex")
- vertex.setParallelism(num_tasks)
- vertex.setInvokableClass(classOf[NoOpInvokable])
-
- val jobGraph = new JobGraph("Test Job", vertex)
-
- val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jm = cluster.getJobManager
+ "support immediate scheduling of a single vertex" in {
+ val num_tasks = 133
+ val vertex = new AbstractJobVertex("Test Vertex")
+ vertex.setParallelism(num_tasks)
+ vertex.setInvokableClass(classOf[NoOpInvokable])
- try {
- val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
- availableSlots should equal(num_tasks)
+ val jobGraph = new JobGraph("Test Job", vertex)
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ val cluster = TestingUtils.startTestingCluster(num_tasks)
+ val jm = cluster.getJobManager
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- val result = expectMsgType[JobResultSuccess]
+ try {
+ val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
+ availableSlots should equal(num_tasks)
- result.jobID should equal(jobGraph.getJobID)
- }
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ val result = expectMsgType[JobResultSuccess]
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ result.jobID should equal(jobGraph.getJobID)
}
- "support queued scheduling of a single vertex" in {
- val num_tasks = 111
-
- val vertex = new AbstractJobVertex("Test Vertex")
- vertex.setParallelism(num_tasks)
- vertex.setInvokableClass(classOf[NoOpInvokable])
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- val jobGraph = new JobGraph("Test job", vertex)
- jobGraph.setAllowQueuedScheduling(true)
+ "support queued scheduling of a single vertex" in {
+ val num_tasks = 111
- val cluster = TestingUtils.startTestingCluster(10)
- val jm = cluster.getJobManager
+ val vertex = new AbstractJobVertex("Test Vertex")
+ vertex.setParallelism(num_tasks)
+ vertex.setInvokableClass(classOf[NoOpInvokable])
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ val jobGraph = new JobGraph("Test job", vertex)
+ jobGraph.setAllowQueuedScheduling(true)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ val cluster = TestingUtils.startTestingCluster(10)
+ val jm = cluster.getJobManager
- val result = expectMsgType[JobResultSuccess]
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
- result.jobID should equal(jobGraph.getJobID)
- }
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ val result = expectMsgType[JobResultSuccess]
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ result.jobID should equal(jobGraph.getJobID)
}
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- "support forward jobs" in {
- val num_tasks = 31
- val sender = new AbstractJobVertex("Sender")
- val receiver = new AbstractJobVertex("Receiver")
-
- sender.setInvokableClass(classOf[Sender])
- receiver.setInvokableClass(classOf[Receiver])
-
- sender.setParallelism(num_tasks)
- receiver.setParallelism(num_tasks)
+ "support forward jobs" in {
+ val num_tasks = 31
+ val sender = new AbstractJobVertex("Sender")
+ val receiver = new AbstractJobVertex("Receiver")
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ sender.setInvokableClass(classOf[Sender])
+ receiver.setInvokableClass(classOf[Receiver])
- val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+ sender.setParallelism(num_tasks)
+ receiver.setParallelism(num_tasks)
- val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
- val jm = cluster.getJobManager
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+ val jm = cluster.getJobManager
- val result = expectMsgType[JobResultSuccess]
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
- result.jobID should equal(jobGraph.getJobID)
- }
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ val result = expectMsgType[JobResultSuccess]
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ result.jobID should equal(jobGraph.getJobID)
}
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- "support bipartite job" in {
- val num_tasks = 31
- val sender = new AbstractJobVertex("Sender")
- val receiver = new AbstractJobVertex("Receiver")
-
- sender.setInvokableClass(classOf[Sender])
- receiver.setInvokableClass(classOf[AgnosticReceiver])
-
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ "support bipartite job" in {
+ val num_tasks = 31
+ val sender = new AbstractJobVertex("Sender")
+ val receiver = new AbstractJobVertex("Receiver")
- val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
+ sender.setInvokableClass(classOf[Sender])
+ receiver.setInvokableClass(classOf[AgnosticReceiver])
- val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
- val jm = cluster.getJobManager
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- expectMsgType[JobResultSuccess]
- }
+ val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+ val jm = cluster.getJobManager
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultSuccess]
}
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- "support two input job failing edge mismatch" in {
- val num_tasks = 11
- val sender1 = new AbstractJobVertex("Sender1")
- val sender2 = new AbstractJobVertex("Sender2")
- val receiver = new AbstractJobVertex("Receiver")
-
- sender1.setInvokableClass(classOf[Sender])
- sender2.setInvokableClass(classOf[Sender])
- receiver.setInvokableClass(classOf[AgnosticReceiver])
-
- sender1.setParallelism(num_tasks)
- sender2.setParallelism(2 * num_tasks)
- receiver.setParallelism(3 * num_tasks)
+ "support two input job failing edge mismatch" in {
+ val num_tasks = 1
+ val sender1 = new AbstractJobVertex("Sender1")
+ val sender2 = new AbstractJobVertex("Sender2")
+ val receiver = new AbstractJobVertex("Receiver")
- receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
- receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+ sender1.setInvokableClass(classOf[Sender])
+ sender2.setInvokableClass(classOf[Sender])
+ receiver.setInvokableClass(classOf[AgnosticReceiver])
- val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+ sender1.setParallelism(num_tasks)
+ sender2.setParallelism(2 * num_tasks)
+ receiver.setParallelism(3 * num_tasks)
- val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
- val jm = cluster.getJobManager
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
+ val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- expectMsgType[JobResultFailed]
- }
+ val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
+ val jm = cluster.getJobManager
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultFailed]
}
- "support two input job" in {
- val num_tasks = 11
- val sender1 = new AbstractJobVertex("Sender1")
- val sender2 = new AbstractJobVertex("Sender2")
- val receiver = new AbstractJobVertex("Receiver")
-
- sender1.setInvokableClass(classOf[Sender])
- sender2.setInvokableClass(classOf[Sender])
- receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- sender1.setParallelism(num_tasks)
- sender2.setParallelism(2 * num_tasks)
- receiver.setParallelism(3 * num_tasks)
+ "support two input job" in {
+ val num_tasks = 11
+ val sender1 = new AbstractJobVertex("Sender1")
+ val sender2 = new AbstractJobVertex("Sender2")
+ val receiver = new AbstractJobVertex("Receiver")
- receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
- receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+ sender1.setInvokableClass(classOf[Sender])
+ sender2.setInvokableClass(classOf[Sender])
+ receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
- val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+ sender1.setParallelism(num_tasks)
+ sender2.setParallelism(2 * num_tasks)
+ receiver.setParallelism(3 * num_tasks)
- val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
- val jm = cluster.getJobManager
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
- expectMsgType[JobResultSuccess]
- }
+ val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
+ val jm = cluster.getJobManager
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ expectMsgType[JobResultSuccess]
}
- "handle job with a failing sender vertex" in {
- val num_tasks = 100
- val sender = new AbstractJobVertex("Sender")
- val receiver = new AbstractJobVertex("Receiver")
-
- sender.setInvokableClass(classOf[ExceptionSender])
- receiver.setInvokableClass(classOf[Receiver])
-
- sender.setParallelism(num_tasks)
- receiver.setParallelism(num_tasks)
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ "handle job with a failing sender vertex" in {
+ val num_tasks = 100
+ val sender = new AbstractJobVertex("Sender")
+ val receiver = new AbstractJobVertex("Receiver")
- val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+ sender.setInvokableClass(classOf[ExceptionSender])
+ receiver.setInvokableClass(classOf[Receiver])
- val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jm = cluster.getJobManager
+ sender.setParallelism(num_tasks)
+ receiver.setParallelism(num_tasks)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! RequestTotalNumberOfSlots
- expectMsg(num_tasks)
- }
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- expectMsgType[JobResultFailed]
- }
+ val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ val cluster = TestingUtils.startTestingCluster(num_tasks)
+ val jm = cluster.getJobManager
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! RequestTotalNumberOfSlots
+ expectMsg(num_tasks)
}
- "handle job with an occasionally failing sender vertex" in {
- val num_tasks = 100
- val sender = new AbstractJobVertex("Sender")
- val receiver = new AbstractJobVertex("Receiver")
-
- sender.setInvokableClass(classOf[SometimesExceptionSender])
- receiver.setInvokableClass(classOf[Receiver])
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultFailed]
+ }
- sender.setParallelism(num_tasks)
- receiver.setParallelism(num_tasks)
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ "handle job with an occasionally failing sender vertex" in {
+ val num_tasks = 100
+ val sender = new AbstractJobVertex("Sender")
+ val receiver = new AbstractJobVertex("Receiver")
- val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+ sender.setInvokableClass(classOf[SometimesExceptionSender])
+ receiver.setInvokableClass(classOf[Receiver])
- val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jm = cluster.getJobManager
+ sender.setParallelism(num_tasks)
+ receiver.setParallelism(num_tasks)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! RequestTotalNumberOfSlots
- expectMsg(num_tasks)
- }
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- expectMsgType[JobResultFailed]
- }
+ val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ val cluster = TestingUtils.startTestingCluster(num_tasks)
+ val jm = cluster.getJobManager
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! RequestTotalNumberOfSlots
+ expectMsg(num_tasks)
}
- "handle job with a failing receiver vertex" in {
- val num_tasks = 200
- val sender = new AbstractJobVertex("Sender")
- val receiver = new AbstractJobVertex("Receiver")
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultFailed]
+ }
- sender.setInvokableClass(classOf[Sender])
- receiver.setInvokableClass(classOf[ExceptionReceiver])
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- sender.setParallelism(num_tasks)
- receiver.setParallelism(num_tasks)
+ "handle job with a failing receiver vertex" in {
+ val num_tasks = 200
+ val sender = new AbstractJobVertex("Sender")
+ val receiver = new AbstractJobVertex("Receiver")
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ sender.setInvokableClass(classOf[Sender])
+ receiver.setInvokableClass(classOf[ExceptionReceiver])
- val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+ sender.setParallelism(num_tasks)
+ receiver.setParallelism(num_tasks)
- val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
- val jm = cluster.getJobManager
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! SubmitJob(jobGraph)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- expectMsgType[JobResultFailed]
- }
+ val jobGraph = new JobGraph("Pointwise job", sender, receiver)
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+ val jm = cluster.getJobManager
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! SubmitJob(jobGraph)
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultFailed]
}
- "handle job with all vertices failing during instantiation" in {
- val num_tasks = 200
- val sender = new AbstractJobVertex("Sender")
- val receiver = new AbstractJobVertex("Receiver")
-
- sender.setInvokableClass(classOf[InstantiationErrorSender])
- receiver.setInvokableClass(classOf[Receiver])
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- sender.setParallelism(num_tasks)
- receiver.setParallelism(num_tasks)
+ "handle job with all vertices failing during instantiation" in {
+ val num_tasks = 200
+ val sender = new AbstractJobVertex("Sender")
+ val receiver = new AbstractJobVertex("Receiver")
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ sender.setInvokableClass(classOf[InstantiationErrorSender])
+ receiver.setInvokableClass(classOf[Receiver])
- val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+ sender.setParallelism(num_tasks)
+ receiver.setParallelism(num_tasks)
- val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jm = cluster.getJobManager
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! RequestTotalNumberOfSlots
- expectMsg(num_tasks)
+ val jobGraph = new JobGraph("Pointwise job", sender, receiver)
- jm ! SubmitJob(jobGraph)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- expectMsgType[JobResultFailed]
- }
+ val cluster = TestingUtils.startTestingCluster(num_tasks)
+ val jm = cluster.getJobManager
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! RequestTotalNumberOfSlots
+ expectMsg(num_tasks)
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ jm ! SubmitJob(jobGraph)
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultFailed]
}
- "handle job with some vertices failing during instantiation" in {
- val num_tasks = 200
- val sender = new AbstractJobVertex("Sender")
- val receiver = new AbstractJobVertex("Receiver")
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
- sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
- receiver.setInvokableClass(classOf[Receiver])
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
- sender.setParallelism(num_tasks)
- receiver.setParallelism(num_tasks)
+ "handle job with some vertices failing during instantiation" in {
+ val num_tasks = 200
+ val sender = new AbstractJobVertex("Sender")
+ val receiver = new AbstractJobVertex("Receiver")
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
+ receiver.setInvokableClass(classOf[Receiver])
- val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+ sender.setParallelism(num_tasks)
+ receiver.setParallelism(num_tasks)
- val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jm = cluster.getJobManager
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
- try {
- within(TestingUtils.TESTING_DURATION) {
- jm ! RequestTotalNumberOfSlots
- expectMsg(num_tasks)
+ val jobGraph = new JobGraph("Pointwise job", sender, receiver)
- jm ! SubmitJob(jobGraph)
- expectMsg(SubmissionSuccess(jobGraph.getJobID))
- expectMsgType[JobResultFailed]
- }
+ val cluster = TestingUtils.startTestingCluster(num_tasks)
+ val jm = cluster.getJobManager
- val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
- RequestExecutionGraph(jobGraph.getJobID)) match {
- case ExecutionGraphFound(_, eg) => eg
- case ExecutionGraphNotFound(jobID) =>
- fail(s"The execution graph for job ID ${jobID} was not retrievable.")
- }
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ jm ! RequestTotalNumberOfSlots
+ expectMsg(num_tasks)
- executionGraph.getRegisteredExecutions.size should equal(0)
- } finally {
- cluster.stop()
- }
+ jm ! SubmitJob(jobGraph)
+ expectMsg(SubmissionSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultFailed]
}
+
+ jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+ expectMsg(true)
+ } finally {
+ cluster.stop()
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9961ada..5a51265 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -44,7 +44,7 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
override def startTaskManager(index: Int)(implicit system: ActorSystem) = {
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
- TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, configuration, true)
+ TaskManager.parseConfiguration(HOSTNAME, configuration, true)
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 9782b72..67a8934 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,15 +19,18 @@
package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorRef, Props}
+import akka.pattern.{ask, pipe}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
-WaitForAllVerticesToBeRunning, ExecutionGraphFound, RequestExecutionGraph}
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged,
+ExecutionStateChanged}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved
import scala.collection.convert.WrapAsScala
+import scala.concurrent.{Await, Future}
trait TestingJobManager extends ActorLogMessages with WrapAsScala {
@@ -72,6 +75,22 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
if(cleanup){
waitForAllVerticesToBeRunning.remove(jobID)
}
+ case NotifyWhenJobRemoved(jobID) => {
+ val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
+
+ val responses = tms.map{
+ tm =>
+ (tm ? NotifyWhenJobRemoved(jobID))(timeout).mapTo[Boolean]
+ }
+
+ import context.dispatcher
+ val f = Future.sequence(responses)
+
+ val t = Await.result(f, timeout)
+
+ sender() ! true
+// Future.fold(responses)(true)(_ & _) pipeTo sender()
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 3b34955..7941226 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -37,4 +37,5 @@ object TestingJobManagerMessages {
case class WaitForAllVerticesToBeRunning(jobID: JobID)
case class AllVerticesRunning(jobID: JobID)
+ case class NotifyWhenJobRemoved(jobID: JobID)
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 5c6cca1..31a43cb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,16 +19,20 @@
package org.apache.flink.runtime.testingUtils
import akka.actor.ActorRef
+import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
import org.apache.flink.runtime.{ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
+import scala.concurrent.duration._
trait TestingTaskManager extends ActorLogMessages {
- self: TaskManager =>
+ that: TaskManager =>
val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+ val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
abstract override def receiveWithLogMessages = {
receiveTestMessages orElse super.receiveWithLogMessages
@@ -51,7 +55,32 @@ trait TestingTaskManager extends ActorLogMessages {
case None =>
}
case RequestBroadcastVariablesWithReferences => {
- sender() ! ResponseBroadcastVariablesWithReferences(bcVarManager.getNumberOfVariablesWithReferences)
+ sender() ! ResponseBroadcastVariablesWithReferences(
+ bcVarManager.getNumberOfVariablesWithReferences)
+ }
+ case NotifyWhenJobRemoved(jobID) => {
+ if(runningTasks.values.exists(_.getJobID == jobID)){
+ val set = waitForJobRemoval.getOrElse(jobID, Set())
+ waitForJobRemoval += (jobID -> (set + sender()))
+ import context.dispatcher
+ context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
+ }else{
+ waitForJobRemoval.get(jobID) match {
+ case Some(listeners) => (listeners + sender()) foreach (_ ! true)
+ case None => sender() ! true
+ }
+ }
+ }
+ case CheckIfJobRemoved(jobID) => {
+ if(runningTasks.values.forall(_.getJobID != jobID)){
+ waitForJobRemoval.get(jobID) match {
+ case Some(listeners) => listeners foreach (_ ! true)
+ case None =>
+ }
+ }else{
+ import context.dispatcher
+ context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index 24d7e5c..cb5282e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -19,10 +19,12 @@
package org.apache.flink.runtime.testingUtils
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.taskmanager.Task
object TestingTaskManagerMessages{
case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
case object RequestRunningTasks
case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
import collection.JavaConverters._
@@ -30,4 +32,6 @@ object TestingTaskManagerMessages{
}
case object RequestBroadcastVariablesWithReferences
case class ResponseBroadcastVariablesWithReferences(number: Int)
+
+ case class CheckIfJobRemoved(jobID: JobID)
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index df0915e..dddef4a 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -208,7 +208,7 @@ under the License.
</execution>
</executions>
</plugin>
-
+
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
@@ -232,7 +232,7 @@ under the License.
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
-
+
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index d28e9dd..dcab0b8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index 19c2f90..ce19a65 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index f25dd6c..8685cc5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 7e9e4e5..4f8f632 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 171db60..539f96c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 40071b7..5468637 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index f5ab3ba..06e40d8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index d77318e..22dafda 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -80,4 +80,138 @@ under the License.
<scope>provided</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ <compilerPlugins>
+ <compilerPlugin>
+ <groupId>org.scalamacros</groupId>
+ <artifactId>paradise_${scala.version}</artifactId>
+ <version>${scala.macros.version}</version>
+ </compilerPlugin>
+ </compilerPlugins>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+ <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index b73f961..c30d976 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -46,7 +46,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
@@ -68,7 +67,7 @@ public abstract class AbstractTestBase {
protected final Configuration config;
- protected TestingCluster executor;
+ protected ForkableFlinkMiniCluster executor;
private final List<File> tempFiles;
@@ -97,16 +96,15 @@ public abstract class AbstractTestBase {
// --------------------------------------------------------------------------------------------
public void startCluster() throws Exception {
- Thread.sleep(250);
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
- this.executor = new TestingCluster(config);
+ this.executor = new ForkableFlinkMiniCluster(config);
}
-
+
public void stopCluster() throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 83dd73b..03f60b3 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -31,7 +31,6 @@ import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.java.CollectionEnvironment;
@@ -194,12 +193,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
private static final class TestEnvironment extends ExecutionEnvironment {
- private final FlinkMiniCluster executor;
+ private final ForkableFlinkMiniCluster executor;
private JobExecutionResult latestResult;
- private TestEnvironment(FlinkMiniCluster executor, int degreeOfParallelism) {
+ private TestEnvironment(ForkableFlinkMiniCluster executor, int degreeOfParallelism) {
this.executor = executor;
setDegreeOfParallelism(degreeOfParallelism);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
new file mode 100644
index 0000000..f82a4a6
--- /dev/null
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import akka.actor.{Props, ActorSystem, ActorRef}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingTaskManager
+
+class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
+LocalFlinkMiniCluster(userConfiguration) {
+
+ override def generateConfiguration(userConfiguration: Configuration): Configuration = {
+ val forNumberString = System.getProperty("forkNumber")
+
+ val forkNumber = try {
+ Integer.parseInt(forNumberString)
+ }catch{
+ case e: NumberFormatException => -1
+ }
+
+ val config = userConfiguration.clone()
+
+ if(forkNumber != -1){
+ val jobManagerRPC = 1024 + forkNumber*300
+ val taskManagerRPC = 1024 + forkNumber*300 + 100
+ val taskManagerData = 1024 + forkNumber*300 + 200
+
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
+ config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
+ config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
+
+ }
+
+ super.generateConfiguration(config)
+ }
+
+ override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
+ val config = configuration.clone()
+
+ val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
+ .DEFAULT_TASK_MANAGER_IPC_PORT)
+ val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
+ .DEFAULT_TASK_MANAGER_DATA_PORT)
+
+ if(rpcPort > 0){
+ config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+ }
+
+ if(dataPort > 0){
+ config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
+ }
+
+ val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) =
+ TaskManager.parseConfiguration(HOSTNAME, config, false)
+
+ system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig,
+ networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 6347cb5..303ee3d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public abstract class CancellingTestBase {
// --------------------------------------------------------------------------------------------
- protected LocalFlinkMiniCluster executor;
+ protected ForkableFlinkMiniCluster executor;
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
@@ -87,7 +88,7 @@ public abstract class CancellingTestBase {
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
- this.executor = new LocalFlinkMiniCluster(config);
+ this.executor = new ForkableFlinkMiniCluster(config);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index 4bd3fc7..9046d2d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -24,8 +24,8 @@ import java.io.FileWriter;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
@@ -36,7 +36,7 @@ public class PackagedProgramEndToEndITCase {
@Test
public void testEverything() {
- LocalFlinkMiniCluster cluster = null;
+ ForkableFlinkMiniCluster cluster = null;
File points = null;
File clusters = null;
@@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- cluster = new LocalFlinkMiniCluster(config);
+ cluster = new ForkableFlinkMiniCluster(config);
RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index e8b716b..06013b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -19,7 +19,6 @@
package org.apache.flink.test.util;
import akka.actor.ActorRef;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.junit.Assert;
import org.apache.flink.runtime.client.JobClient;
@@ -120,7 +119,7 @@ public abstract class FailingTestBase extends RecordAPITestBase {
// reference to the timeout thread
private final Thread timeoutThread;
// cluster to submit the job to.
- private final FlinkMiniCluster executor;
+ private final ForkableFlinkMiniCluster executor;
// job graph of the failing job (submitted first)
private final JobGraph failingJob;
// job graph of the working job (submitted after return from failing job)
@@ -129,8 +128,8 @@ public abstract class FailingTestBase extends RecordAPITestBase {
private volatile Exception error;
- public SubmissionThread(Thread timeoutThread, FlinkMiniCluster executor, JobGraph failingJob,
- JobGraph job) {
+ public SubmissionThread(Thread timeoutThread, ForkableFlinkMiniCluster executor, JobGraph
+ failingJob, JobGraph job) {
this.timeoutThread = timeoutThread;
this.executor = executor;
this.failingJob = failingJob;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index a923af6..4024304 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index 60651d1..8ab41ec 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
index 1155d73..587bbf3 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,