You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/04/27 15:05:22 UTC

[1/2] flink git commit: [FLINK-3601] increase timeout for JobManagerTest methods

Repository: flink
Updated Branches:
  refs/heads/master 8ad264d0f -> 130a22d6c


[FLINK-3601] increase timeout for JobManagerTest methods


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/130a22d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/130a22d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/130a22d6

Branch: refs/heads/master
Commit: 130a22d6c8839018068a8a000b4a2ed6a1ab3d49
Parents: 7ec1300
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Apr 27 14:44:47 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Apr 27 14:50:17 2016 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/JobManagerTest.java      | 295 ++++++++++---------
 1 file changed, 153 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/130a22d6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index d283e91..3785fc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -109,225 +109,236 @@ public class JobManagerTest {
 	@Test
 	public void testRequestPartitionState() throws Exception {
 		new JavaTestKit(system) {{
-			// Setup
-			TestingCluster cluster = null;
+			new Within(duration("15 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+					TestingCluster cluster = null;
 
-			try {
-				cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+					try {
+						cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
 
-				final IntermediateDataSetID rid = new IntermediateDataSetID();
+						final IntermediateDataSetID rid = new IntermediateDataSetID();
 
-				// Create a task
-				final JobVertex sender = new JobVertex("Sender");
-				sender.setParallelism(1);
-				sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
-				sender.createAndAddResultDataSet(rid, PIPELINED);
+						// Create a task
+						final JobVertex sender = new JobVertex("Sender");
+						sender.setParallelism(1);
+						sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						sender.createAndAddResultDataSet(rid, PIPELINED);
 
-				final JobGraph jobGraph = new JobGraph("Blocking test job", new ExecutionConfig(), sender);
-				final JobID jid = jobGraph.getJobID();
+						final JobGraph jobGraph = new JobGraph("Blocking test job", new ExecutionConfig(), sender);
+						final JobID jid = jobGraph.getJobID();
 
-				final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
-						TestingUtils.TESTING_DURATION());
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+							TestingUtils.TESTING_DURATION());
 
-				// we can set the leader session ID to None because we don't use this gateway to send messages
-				final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
 
-				// Submit the job and wait for all vertices to be running
-				jobManagerGateway.tell(
-						new SubmitJob(
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new SubmitJob(
 								jobGraph,
 								ListeningBehaviour.EXECUTION_RESULT),
-						testActorGateway);
-				expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+							testActorGateway);
+						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
-				jobManagerGateway.tell(
-						new WaitForAllVerticesToBeRunningOrFinished(jid),
-						testActorGateway);
+						jobManagerGateway.tell(
+							new WaitForAllVerticesToBeRunningOrFinished(jid),
+							testActorGateway);
 
-				expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
 
-				// This is the mock execution ID of the task requesting the state of the partition
-				final ExecutionAttemptID receiver = new ExecutionAttemptID();
+						// This is the mock execution ID of the task requesting the state of the partition
+						final ExecutionAttemptID receiver = new ExecutionAttemptID();
 
-				// Request the execution graph to get the runtime info
-				jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
+						// Request the execution graph to get the runtime info
+						jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
 
-				final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class)
-						.executionGraph();
+						final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class)
+							.executionGraph();
 
-				final ExecutionVertex vertex = eg.getJobVertex(sender.getID())
-						.getTaskVertices()[0];
+						final ExecutionVertex vertex = eg.getJobVertex(sender.getID())
+							.getTaskVertices()[0];
 
-				final IntermediateResultPartition partition = vertex.getProducedPartitions()
-						.values().iterator().next();
+						final IntermediateResultPartition partition = vertex.getProducedPartitions()
+							.values().iterator().next();
 
-				final ResultPartitionID partitionId = new ResultPartitionID(
-						partition.getPartitionId(),
-						vertex.getCurrentExecutionAttempt().getAttemptId());
+						final ResultPartitionID partitionId = new ResultPartitionID(
+							partition.getPartitionId(),
+							vertex.getCurrentExecutionAttempt().getAttemptId());
 
-				// - The test ----------------------------------------------------------------------
+						// - The test ----------------------------------------------------------------------
 
-				// 1. All execution states
-				RequestPartitionState request = new RequestPartitionState(
-						jid, partitionId, receiver, rid);
+						// 1. All execution states
+						RequestPartitionState request = new RequestPartitionState(
+							jid, partitionId, receiver, rid);
 
-				for (ExecutionState state : ExecutionState.values()) {
-					ExecutionGraphTestUtils.setVertexState(vertex, state);
+						for (ExecutionState state : ExecutionState.values()) {
+							ExecutionGraphTestUtils.setVertexState(vertex, state);
 
-					jobManagerGateway.tell(request, testActorGateway);
+							jobManagerGateway.tell(request, testActorGateway);
 
-					LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+							LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
 
-					assertEquals(PartitionState.class, lsm.message().getClass());
+							assertEquals(PartitionState.class, lsm.message().getClass());
 
-					PartitionState resp = (PartitionState) lsm.message();
+							PartitionState resp = (PartitionState) lsm.message();
 
-					assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-					assertEquals(request.taskResultId(), resp.taskResultId());
-					assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-					assertEquals(state, resp.state());
-				}
+							assertEquals(request.taskExecutionId(), resp.taskExecutionId());
+							assertEquals(request.taskResultId(), resp.taskResultId());
+							assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
+							assertEquals(state, resp.state());
+						}
 
-				// 2. Non-existing execution
-				request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
+						// 2. Non-existing execution
+						request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
 
-				jobManagerGateway.tell(request, testActorGateway);
+						jobManagerGateway.tell(request, testActorGateway);
 
-				LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+						LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
 
-				assertEquals(PartitionState.class, lsm.message().getClass());
+						assertEquals(PartitionState.class, lsm.message().getClass());
 
-				PartitionState resp = (PartitionState) lsm.message();
+						PartitionState resp = (PartitionState) lsm.message();
 
-				assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-				assertEquals(request.taskResultId(), resp.taskResultId());
-				assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-				assertNull(resp.state());
+						assertEquals(request.taskExecutionId(), resp.taskExecutionId());
+						assertEquals(request.taskResultId(), resp.taskResultId());
+						assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
+						assertNull(resp.state());
 
-				// 3. Non-existing job
-				request = new RequestPartitionState(
-						new JobID(), new ResultPartitionID(), receiver, rid);
+						// 3. Non-existing job
+						request = new RequestPartitionState(
+							new JobID(), new ResultPartitionID(), receiver, rid);
 
-				jobManagerGateway.tell(request, testActorGateway);
+						jobManagerGateway.tell(request, testActorGateway);
 
-				lsm = expectMsgClass(LeaderSessionMessage.class);
+						lsm = expectMsgClass(LeaderSessionMessage.class);
 
-				assertEquals(PartitionState.class, lsm.message().getClass());
+						assertEquals(PartitionState.class, lsm.message().getClass());
 
-				resp = (PartitionState) lsm.message();
+						resp = (PartitionState) lsm.message();
 
-				assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-				assertEquals(request.taskResultId(), resp.taskResultId());
-				assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-				assertNull(resp.state());
-			}
-			catch (Exception e) {
-				e.printStackTrace();
-				fail(e.getMessage());
-			}
-			finally {
-				if (cluster != null) {
-					cluster.shutdown();
+						assertEquals(request.taskExecutionId(), resp.taskExecutionId());
+						assertEquals(request.taskResultId(), resp.taskResultId());
+						assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
+						assertNull(resp.state());
+					} catch (Exception e) {
+						e.printStackTrace();
+						fail(e.getMessage());
+					} finally {
+						if (cluster != null) {
+							cluster.shutdown();
+						}
+					}
 				}
-			}
+			};
 		}};
 	}
 
 	@Test
 	public void testStopSignal() throws Exception {
 		new JavaTestKit(system) {{
-			// Setup
-			TestingCluster cluster = null;
+			new Within(duration("15 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+					TestingCluster cluster = null;
 
-			try {
-				cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+					try {
+						cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
 
-				// Create a task
-				final JobVertex sender = new JobVertex("Sender");
-				sender.setParallelism(2);
-				sender.setInvokableClass(StoppableInvokable.class);
+						// Create a task
+						final JobVertex sender = new JobVertex("Sender");
+						sender.setParallelism(2);
+						sender.setInvokableClass(StoppableInvokable.class);
 
-				final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender);
-				final JobID jid = jobGraph.getJobID();
+						final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender);
+						final JobID jid = jobGraph.getJobID();
 
-				final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
-				// we can set the leader session ID to None because we don't use this gateway to send messages
-				final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
 
-				// Submit the job and wait for all vertices to be running
-				jobManagerGateway.tell(
-						new SubmitJob(
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new SubmitJob(
 								jobGraph,
 								ListeningBehaviour.EXECUTION_RESULT),
-						testActorGateway);
-				expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+							testActorGateway);
+						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
-				jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway);
-				expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+						jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway);
+						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
 
-				jobManagerGateway.tell(new StopJob(jid), testActorGateway);
+						jobManagerGateway.tell(new StopJob(jid), testActorGateway);
 
-				// - The test ----------------------------------------------------------------------
-				expectMsgClass(StoppingSuccess.class);
+						// - The test ----------------------------------------------------------------------
+						expectMsgClass(StoppingSuccess.class);
 
-				expectMsgClass(JobManagerMessages.JobResultSuccess.class);
-			}
-			finally {
-				if (cluster != null) {
-					cluster.shutdown();
+						expectMsgClass(JobManagerMessages.JobResultSuccess.class);
+					} finally {
+						if (cluster != null) {
+							cluster.shutdown();
+						}
+					}
 				}
-			}
+			};
 		}};
 	}
 
 	@Test
 	public void testStopSignalFail() throws Exception {
 		new JavaTestKit(system) {{
-			// Setup
-			TestingCluster cluster = null;
+			new Within(duration("15 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+					TestingCluster cluster = null;
 
-			try {
-				cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+					try {
+						cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
 
-				// Create a task
-				final JobVertex sender = new JobVertex("Sender");
-				sender.setParallelism(1);
-				sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						// Create a task
+						final JobVertex sender = new JobVertex("Sender");
+						sender.setParallelism(1);
+						sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
 
-				final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new ExecutionConfig(), sender);
-				final JobID jid = jobGraph.getJobID();
+						final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new ExecutionConfig(), sender);
+						final JobID jid = jobGraph.getJobID();
 
-				final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
-				// we can set the leader session ID to None because we don't use this gateway to send messages
-				final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
 
-				// Submit the job and wait for all vertices to be running
-				jobManagerGateway.tell(
-						new SubmitJob(
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new SubmitJob(
 								jobGraph,
 								ListeningBehaviour.EXECUTION_RESULT),
-						testActorGateway);
-				expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+							testActorGateway);
+						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
-				jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway);
-				expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+						jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway);
+						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
 
-				jobManagerGateway.tell(new StopJob(jid), testActorGateway);
+						jobManagerGateway.tell(new StopJob(jid), testActorGateway);
 
-				// - The test ----------------------------------------------------------------------
-				expectMsgClass(StoppingFailure.class);
+						// - The test ----------------------------------------------------------------------
+						expectMsgClass(StoppingFailure.class);
 
-				jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
+						jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
 
-				expectMsgClass(ExecutionGraphFound.class);
-			}
-			finally {
-				if (cluster != null) {
-					cluster.shutdown();
+						expectMsgClass(ExecutionGraphFound.class);
+					} finally {
+						if (cluster != null) {
+							cluster.shutdown();
+						}
+					}
 				}
-			}
+			};
 		}};
 	}
 


[2/2] flink git commit: [FLINK-3824] ResourceManager may repeatedly connect to outdated JobManager

Posted by mx...@apache.org.
[FLINK-3824] ResourceManager may repeatedly connect to outdated JobManager

When the ResourceManager receives a new leading JobManager via the
LeaderRetrievalService it tries to register with this JobManager until
connected. If during registration a new leader gets elected, the
ResourceManager may still repeatedly try to register with the old
one. This doesn't affect the registration with the new JobManager but
leaves error messages in the log file and may process unnecessary
messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ec1300a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ec1300a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ec1300a

Branch: refs/heads/master
Commit: 7ec1300a740e9b8e1eb595bc15e752a167592e00
Parents: 8ad264d
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Apr 26 18:44:47 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Apr 27 14:50:17 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/FlinkResourceManager.java  | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ec1300a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 3c1a698..a5c354c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -436,22 +436,22 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceID> extend
 
 			@Override
 			public void onComplete(Throwable failure, Object msg) {
-				if (msg != null) {
-					if (msg instanceof LeaderSessionMessage &&
-						((LeaderSessionMessage) msg).message() instanceof RegisterResourceManagerSuccessful)
-					{
-						self().tell(msg, ActorRef.noSender());
-					}
-					else {
-						LOG.error("Invalid response type to registration at JobManager: {}", msg);
+				// only process if we haven't been connected in the meantime
+				if (jobManager == null) {
+					if (msg != null) {
+						if (msg instanceof LeaderSessionMessage &&
+							((LeaderSessionMessage) msg).message() instanceof RegisterResourceManagerSuccessful) {
+							self().tell(msg, ActorRef.noSender());
+						} else {
+							LOG.error("Invalid response type to registration at JobManager: {}", msg);
+							self().tell(retryMessage, ActorRef.noSender());
+						}
+					} else {
+						// no success
+						LOG.error("Resource manager could not register at JobManager", failure);
 						self().tell(retryMessage, ActorRef.noSender());
 					}
 				}
-				else {
-					// no success
-					LOG.error("Resource manager could not register at JobManager", failure);
-					self().tell(retryMessage, ActorRef.noSender());
-				}
 			}
 
 		}, context().dispatcher());