You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:12 UTC
[48/63] [abbrv] git commit: Fix logging in EventCollector Fix
comparisons (null pointer safe) in JobManagerITCase
Fix logging in EventCollector
Fix comparisons (null pointer safe) in JobManagerITCase
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d5d3a080
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d5d3a080
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d5d3a080
Branch: refs/heads/master
Commit: d5d3a08031b1eae806c371f5b3fee95991c87f57
Parents: 1fdd7e6
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 15 03:45:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200
----------------------------------------------------------------------
.../runtime/jobmanager/EventCollector.java | 6 +-
.../runtime/jobmanager/JobManagerITCase.java | 80 ++++++++++++++------
2 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d3a080/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
index 551dce2..99b5374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
@@ -26,8 +26,6 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
import org.apache.flink.runtime.event.job.JobEvent;
@@ -46,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.profiling.ProfilingListener;
import org.apache.flink.runtime.profiling.types.ProfilingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The event collector collects events which occurred during the execution of a job and prepares them
@@ -54,7 +54,7 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent;
*/
public final class EventCollector extends TimerTask implements ProfilingListener {
- private static final Log LOG = LogFactory.getLog(EventCollector.class);
+ private static final Logger LOG = LoggerFactory.getLogger(EventCollector.class);
/**
* The execution listener wrapper is an auxiliary class. It is required
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d5d3a080/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index a0224de..f4d74a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -177,13 +177,13 @@ public class JobManagerITCase {
}
assertTrue("The job did not finish successfully.", success);
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -223,6 +223,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -231,13 +234,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FINISHED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -282,6 +285,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -290,6 +296,8 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FINISHED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
@@ -297,7 +305,6 @@ public class JobManagerITCase {
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
- assertEquals(0, eg.getRegisteredExecutions().size());
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
@@ -340,6 +347,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -348,13 +358,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FINISHED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -372,7 +382,7 @@ public class JobManagerITCase {
@Test
public void testTwoInputJobFailingEdgeMismatch() {
- final int NUM_TASKS = 2;
+ final int NUM_TASKS = 11;
try {
final AbstractJobVertex sender1 = new AbstractJobVertex("Sender1");
@@ -384,15 +394,15 @@ public class JobManagerITCase {
receiver.setInvokableClass(AgnosticReceiver.class);
sender1.setParallelism(NUM_TASKS);
- sender2.setParallelism(NUM_TASKS);
- receiver.setParallelism(NUM_TASKS);
+ sender2.setParallelism(2*NUM_TASKS);
+ receiver.setParallelism(3*NUM_TASKS);
receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE);
receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE);
final JobGraph jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2);
- final JobManager jm = startJobManager(3 * NUM_TASKS);
+ final JobManager jm = startJobManager(6*NUM_TASKS);
final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
@@ -403,6 +413,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -411,6 +424,8 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FAILED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
@@ -418,7 +433,6 @@ public class JobManagerITCase {
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
- assertEquals(0, eg.getRegisteredExecutions().size());
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
}
finally {
@@ -465,6 +479,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -473,13 +490,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FINISHED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -526,6 +543,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -534,13 +554,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FAILED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -587,6 +607,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -595,13 +618,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FAILED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -647,6 +670,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -655,13 +681,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FAILED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -711,6 +737,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -719,13 +748,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FAILED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
@@ -775,6 +804,9 @@ public class JobManagerITCase {
JobSubmissionResult result = jm.submitJob(jobGraph);
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
// monitor the execution
@@ -783,13 +815,13 @@ public class JobManagerITCase {
if (eg != null) {
eg.waitForJobEnd();
assertEquals(JobStatus.FAILED, eg.getState());
+
+ assertEquals(0, eg.getRegisteredExecutions().size());
}
else {
// already done, that was fast;
}
- assertEquals(0, eg.getRegisteredExecutions().size());
-
// make sure that in any case, the network buffers are all returned
waitForTaskThreadsToBeTerminated();
assertEquals(bp.numBuffers(), bp.numAvailableBuffers());