You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:12 UTC

[48/63] [abbrv] git commit: Fix logging in EventCollector Fix comparisons (null pointer safe) in JobManagerITCase

Fix logging in EventCollector
Fix comparisons (null pointer safe) in JobManagerITCase


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d5d3a080
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d5d3a080
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d5d3a080

Branch: refs/heads/master
Commit: d5d3a08031b1eae806c371f5b3fee95991c87f57
Parents: 1fdd7e6
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 03:45:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/EventCollector.java      |  6 +-
 .../runtime/jobmanager/JobManagerITCase.java    | 80 ++++++++++++++------
 2 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d3a080/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
index 551dce2..99b5374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
@@ -26,8 +26,6 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
 import org.apache.flink.runtime.event.job.JobEvent;
@@ -46,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
 import org.apache.flink.runtime.profiling.ProfilingListener;
 import org.apache.flink.runtime.profiling.types.ProfilingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The event collector collects events which occurred during the execution of a job and prepares them
@@ -54,7 +54,7 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent;
  */
 public final class EventCollector extends TimerTask implements ProfilingListener {
 
-	private static final Log LOG = LogFactory.getLog(EventCollector.class);
+	private static final Logger LOG = LoggerFactory.getLogger(EventCollector.class);
 
 	/**
 	 * The execution listener wrapper is an auxiliary class. It is required

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d3a080/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index a0224de..f4d74a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -177,13 +177,13 @@ public class JobManagerITCase {
 					}
 					
 					assertTrue("The job did not finish successfully.", success);
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -223,6 +223,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 				
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -231,13 +234,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FINISHED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -282,6 +285,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -290,6 +296,8 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FINISHED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
@@ -297,7 +305,6 @@ public class JobManagerITCase {
 				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
-				assertEquals(0, eg.getRegisteredExecutions().size());
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
@@ -340,6 +347,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -348,13 +358,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FINISHED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -372,7 +382,7 @@ public class JobManagerITCase {
 	@Test
 	public void testTwoInputJobFailingEdgeMismatch() {
 		
-		final int NUM_TASKS = 2;
+		final int NUM_TASKS = 11;
 		
 		try {
 			final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
@@ -384,15 +394,15 @@ public class JobManagerITCase {
 			receiver.setInvokableClass(AgnosticReceiver.class);
 			
 			sender1.setParallelism(NUM_TASKS);
-			sender2.setParallelism(NUM_TASKS);
-			receiver.setParallelism(NUM_TASKS);
+			sender2.setParallelism(2*NUM_TASKS);
+			receiver.setParallelism(3*NUM_TASKS);
 			
 			receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
 			receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
 			
 			final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
 			
-			final JobManager jm = startJobManager(3 * NUM_TASKS);
+			final JobManager jm = startJobManager(6*NUM_TASKS);
 			
 			final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
@@ -403,6 +413,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -411,6 +424,8 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FAILED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
@@ -418,7 +433,6 @@ public class JobManagerITCase {
 				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
-				assertEquals(0, eg.getRegisteredExecutions().size());
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
 			}
 			finally {
@@ -465,6 +479,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -473,13 +490,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FINISHED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -526,6 +543,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -534,13 +554,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FAILED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -587,6 +607,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -595,13 +618,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FAILED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -647,6 +670,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 				
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -655,13 +681,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FAILED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -711,6 +737,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -719,13 +748,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FAILED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -775,6 +804,9 @@ public class JobManagerITCase {
 				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
 				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
 				
 				// monitor the execution
@@ -783,13 +815,13 @@ public class JobManagerITCase {
 				if (eg != null) {
 					eg.waitForJobEnd();
 					assertEquals(JobStatus.FAILED, eg.getState());
+					
+					assertEquals(0, eg.getRegisteredExecutions().size());
 				}
 				else {
 					// already done, that was fast;
 				}
 				
-				assertEquals(0, eg.getRegisteredExecutions().size());
-				
 				// make sure that in any case, the network buffers are all returned
 				waitForTaskThreadsToBeTerminated();
 				assertEquals(bp.numBuffers(), bp.numAvailableBuffers());