You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:29 UTC
[15/19] flink git commit: [FLINK-5747] [distributed coordination]
Eager scheduling allocates slots and deploys tasks in bulk
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 8a9a4ce..be26e7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -19,16 +19,13 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -113,7 +110,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
- TestJobStatusListener testListener = new TestJobStatusListener();
+ TerminalJobStatusListener testListener = new TerminalJobStatusListener();
executionGraph.registerJobStatusListener(testListener);
cluster.revokeLeadership();
@@ -146,20 +143,4 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
return jobGraph;
}
-
- public static class TestJobStatusListener implements JobStatusListener {
-
- private final OneShotLatch terminalStateLatch = new OneShotLatch();
-
- public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
- terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
- if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
- terminalStateLatch.trigger();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index d9a1896..f656622 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
@@ -46,7 +47,13 @@ public class MiniClusterITCase extends TestLogger {
cfg.setUseSingleRpcService();
MiniCluster miniCluster = new MiniCluster(cfg);
- executeJob(miniCluster);
+ try {
+ miniCluster.start();
+ executeJob(miniCluster);
+ }
+ finally {
+ miniCluster.shutdown();
+ }
}
@Test
@@ -55,7 +62,13 @@ public class MiniClusterITCase extends TestLogger {
cfg.setUseRpcServicePerComponent();
MiniCluster miniCluster = new MiniCluster(cfg);
- executeJob(miniCluster);
+ try {
+ miniCluster.start();
+ executeJob(miniCluster);
+ }
+ finally {
+ miniCluster.shutdown();
+ }
}
@Test
@@ -64,7 +77,13 @@ public class MiniClusterITCase extends TestLogger {
cfg.setNumJobManagers(3);
MiniCluster miniCluster = new MiniCluster(cfg);
- executeJob(miniCluster);
+ try {
+ miniCluster.start();
+ executeJob(miniCluster);
+ }
+ finally {
+ miniCluster.shutdown();
+ }
}
// ------------------------------------------------------------------------
@@ -72,8 +91,6 @@ public class MiniClusterITCase extends TestLogger {
// ------------------------------------------------------------------------
private static void executeJob(MiniCluster miniCluster) throws Exception {
- miniCluster.start();
-
JobGraph job = getSimpleJob();
miniCluster.runJobBlocking(job);
}
@@ -86,6 +103,7 @@ public class MiniClusterITCase extends TestLogger {
JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
jg.setAllowQueuedScheduling(true);
+ jg.setScheduleMode(ScheduleMode.EAGER);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 2007d35..63dc35d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.environment;
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
* <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
* parallelism can be set via {@link #setParallelism(int)}.
*/
-@Public
+@Internal
public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);