You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 08:48:47 UTC

[3/4] flink git commit: [FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 8c7360a..03e3535 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -135,6 +135,7 @@ public class RescalePartitionerTest extends TestLogger {
 
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 0ed0d83..f0622dd 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -41,8 +41,9 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -54,7 +55,8 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -69,7 +71,8 @@ class TestingYarnJobManager(
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 70894b0..24eb45c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Option;
+import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -188,33 +191,34 @@ public class YarnApplicationMasterRunner {
 			numberProcessors,
 			new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
 
-		try {
-			// ------- (1) load and parse / validate all configurations -------
+		// ------- (1) load and parse / validate all configurations -------
+
+		// loading all config values here has the advantage that the program fails fast, if any
+		// configuration problem occurs
 
-			// loading all config values here has the advantage that the program fails fast, if any
-			// configuration problem occurs
+		final String currDir = ENV.get(Environment.PWD.key());
+		require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
 
-			final String currDir = ENV.get(Environment.PWD.key());
-			require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+		// Note that we use the "appMasterHostname" given by YARN here, to make sure
+		// we use the hostnames given by YARN consistently throughout akka.
+		// for akka "localhost" and "localhost.localdomain" are different actors.
+		final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
+		require(appMasterHostname != null,
+			"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
 
-			// Note that we use the "appMasterHostname" given by YARN here, to make sure
-			// we use the hostnames given by YARN consistently throughout akka.
-			// for akka "localhost" and "localhost.localdomain" are different actors.
-			final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
-			require(appMasterHostname != null,
-				"ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
+		LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
 
-			LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
+		// Flink configuration
+		final Map<String, String> dynamicProperties =
+			FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+		LOG.debug("YARN dynamic properties: {}", dynamicProperties);
 
-			// Flink configuration
-			final Map<String, String> dynamicProperties =
-				FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-			LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+		final Configuration config = createConfiguration(currDir, dynamicProperties);
 
-			final Configuration config = createConfiguration(currDir, dynamicProperties);
+		// Hadoop/Yarn configuration (loads config data automatically from classpath files)
+		final YarnConfiguration yarnConfig = new YarnConfiguration();
 
-			// Hadoop/Yarn configuration (loads config data automatically from classpath files)
-			final YarnConfiguration yarnConfig = new YarnConfiguration();
+		try {
 
 			final int taskManagerContainerMemory;
 			final int numInitialTaskManagers;
@@ -295,8 +299,8 @@ public class YarnApplicationMasterRunner {
 				actorSystem,
 				futureExecutor,
 				ioExecutor,
-				new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
-				scala.Option.<String>empty(),
+				new Some<>(JobManager.JOB_MANAGER_NAME()),
+				Option.<String>empty(),
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
@@ -329,7 +333,6 @@ public class YarnApplicationMasterRunner {
 
 			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
 
-
 			// 4: Process reapers
 			// The process reapers ensure that upon unexpected actor death, the process exits
 			// and does not stay lingering around unresponsive
@@ -364,6 +367,9 @@ public class YarnApplicationMasterRunner {
 				}
 			}
 
+			futureExecutor.shutdownNow();
+			ioExecutor.shutdownNow();
+
 			return INIT_ERROR_EXIT_CODE;
 		}
 
@@ -382,8 +388,11 @@ public class YarnApplicationMasterRunner {
 			}
 		}
 
-		futureExecutor.shutdownNow();
-		ioExecutor.shutdownNow();
+		ExecutorUtils.gracefulShutdown(
+			AkkaUtils.getTimeout(config).toMillis(),
+			TimeUnit.MILLISECONDS,
+			futureExecutor,
+			ioExecutor);
 
 		return 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index d7df66a..28615e6 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -46,8 +46,9 @@ import scala.language.postfixOps
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executor Execution context which is used to execute concurrent tasks in the
+  * @param futureExecutor Execution context which is used to execute concurrent tasks in the
   *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param ioExecutor for blocking io operations
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
@@ -59,7 +60,8 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -74,7 +76,8 @@ class YarnJobManager(
     metricsRegistry: Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,