You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:29 UTC

[15/19] flink git commit: [FLINK-5747] [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 8a9a4ce..be26e7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -19,16 +19,13 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -113,7 +110,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
 
-		TestJobStatusListener testListener = new TestJobStatusListener();
+		TerminalJobStatusListener testListener = new TerminalJobStatusListener();
 		executionGraph.registerJobStatusListener(testListener);
 
 		cluster.revokeLeadership();
@@ -146,20 +143,4 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		return jobGraph;
 	}
-
-	public static class TestJobStatusListener implements JobStatusListener {
-
-		private final OneShotLatch terminalStateLatch = new OneShotLatch();
-
-		public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
-			terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
-		}
-
-		@Override
-		public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
-			if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
-				terminalStateLatch.trigger();
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index d9a1896..f656622 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -46,7 +47,13 @@ public class MiniClusterITCase extends TestLogger {
 		cfg.setUseSingleRpcService();
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
-		executeJob(miniCluster);
+		try {
+			miniCluster.start();
+			executeJob(miniCluster);
+		}
+		finally {
+			miniCluster.shutdown();
+		}
 	}
 
 	@Test
@@ -55,7 +62,13 @@ public class MiniClusterITCase extends TestLogger {
 		cfg.setUseRpcServicePerComponent();
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
-		executeJob(miniCluster);
+		try {
+			miniCluster.start();
+			executeJob(miniCluster);
+		}
+		finally {
+			miniCluster.shutdown();
+		}
 	}
 
 	@Test
@@ -64,7 +77,13 @@ public class MiniClusterITCase extends TestLogger {
 		cfg.setNumJobManagers(3);
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
-		executeJob(miniCluster);
+		try {
+			miniCluster.start();
+			executeJob(miniCluster);
+		}
+		finally {
+			miniCluster.shutdown();
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -72,8 +91,6 @@ public class MiniClusterITCase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private static void executeJob(MiniCluster miniCluster) throws Exception {
-		miniCluster.start();
-
 		JobGraph job = getSimpleJob();
 		miniCluster.runJobBlocking(job);
 	}
@@ -86,6 +103,7 @@ public class MiniClusterITCase extends TestLogger {
 
 		JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
 		jg.setAllowQueuedScheduling(true);
+		jg.setScheduleMode(ScheduleMode.EAGER);
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 2007d35..63dc35d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
  * parallelism can be set via {@link #setParallelism(int)}.
  */
-@Public
+@Internal
 public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);