You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 08:48:47 UTC
[3/4] flink git commit: [FLINK-5085] Execute CheckpointCoordinator's
state discard calls asynchronously
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 8c7360a..03e3535 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -135,6 +135,7 @@ public class RescalePartitionerTest extends TestLogger {
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
+ TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 0ed0d83..f0622dd 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -41,8 +41,9 @@ import scala.concurrent.duration.FiniteDuration
* instead of an anonymous class with the respective mixin to obtain a more readable logger name.
*
* @param flinkConfiguration Configuration object for the actor
- * @param executor Execution context which is used to execute concurrent tasks in the
+ * @param futureExecutor Execution context which is used to execute concurrent tasks in the
* [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+ * @param ioExecutor for blocking io operations
* @param instanceManager Instance manager to manage the registered
* [[org.apache.flink.runtime.taskmanager.TaskManager]]
* @param scheduler Scheduler to schedule Flink jobs
@@ -54,7 +55,8 @@ import scala.concurrent.duration.FiniteDuration
*/
class TestingYarnJobManager(
flinkConfiguration: Configuration,
- executor: Executor,
+ futureExecutor: Executor,
+ ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: Scheduler,
libraryCacheManager: BlobLibraryCacheManager,
@@ -69,7 +71,8 @@ class TestingYarnJobManager(
metricRegistry : Option[MetricRegistry])
extends YarnJobManager(
flinkConfiguration,
- executor,
+ futureExecutor,
+ ioExecutor,
instanceManager,
scheduler,
libraryCacheManager,
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 70894b0..24eb45c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
@@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
@@ -188,33 +191,34 @@ public class YarnApplicationMasterRunner {
numberProcessors,
new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
- try {
- // ------- (1) load and parse / validate all configurations -------
+ // ------- (1) load and parse / validate all configurations -------
+
+ // loading all config values here has the advantage that the program fails fast, if any
+ // configuration problem occurs
- // loading all config values here has the advantage that the program fails fast, if any
- // configuration problem occurs
+ final String currDir = ENV.get(Environment.PWD.key());
+ require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
- final String currDir = ENV.get(Environment.PWD.key());
- require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key());
+ // Note that we use the "appMasterHostname" given by YARN here, to make sure
+ // we use the hostnames given by YARN consistently throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are different actors.
+ final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
+ require(appMasterHostname != null,
+ "ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
- // Note that we use the "appMasterHostname" given by YARN here, to make sure
- // we use the hostnames given by YARN consistently throughout akka.
- // for akka "localhost" and "localhost.localdomain" are different actors.
- final String appMasterHostname = ENV.get(Environment.NM_HOST.key());
- require(appMasterHostname != null,
- "ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key());
+ LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
- LOG.info("YARN assigned hostname for application master: {}", appMasterHostname);
+ // Flink configuration
+ final Map<String, String> dynamicProperties =
+ FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("YARN dynamic properties: {}", dynamicProperties);
- // Flink configuration
- final Map<String, String> dynamicProperties =
- FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
- LOG.debug("YARN dynamic properties: {}", dynamicProperties);
+ final Configuration config = createConfiguration(currDir, dynamicProperties);
- final Configuration config = createConfiguration(currDir, dynamicProperties);
+ // Hadoop/Yarn configuration (loads config data automatically from classpath files)
+ final YarnConfiguration yarnConfig = new YarnConfiguration();
- // Hadoop/Yarn configuration (loads config data automatically from classpath files)
- final YarnConfiguration yarnConfig = new YarnConfiguration();
+ try {
final int taskManagerContainerMemory;
final int numInitialTaskManagers;
@@ -295,8 +299,8 @@ public class YarnApplicationMasterRunner {
actorSystem,
futureExecutor,
ioExecutor,
- new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
- scala.Option.<String>empty(),
+ new Some<>(JobManager.JOB_MANAGER_NAME()),
+ Option.<String>empty(),
getJobManagerClass(),
getArchivistClass())._1();
@@ -329,7 +333,6 @@ public class YarnApplicationMasterRunner {
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
-
// 4: Process reapers
// The process reapers ensure that upon unexpected actor death, the process exits
// and does not stay lingering around unresponsive
@@ -364,6 +367,9 @@ public class YarnApplicationMasterRunner {
}
}
+ futureExecutor.shutdownNow();
+ ioExecutor.shutdownNow();
+
return INIT_ERROR_EXIT_CODE;
}
@@ -382,8 +388,11 @@ public class YarnApplicationMasterRunner {
}
}
- futureExecutor.shutdownNow();
- ioExecutor.shutdownNow();
+ ExecutorUtils.gracefulShutdown(
+ AkkaUtils.getTimeout(config).toMillis(),
+ TimeUnit.MILLISECONDS,
+ futureExecutor,
+ ioExecutor);
return 0;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index d7df66a..28615e6 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -46,8 +46,9 @@ import scala.language.postfixOps
* to start/administer/stop the Yarn session.
*
* @param flinkConfiguration Configuration object for the actor
- * @param executor Execution context which is used to execute concurrent tasks in the
+ * @param futureExecutor Execution context which is used to execute concurrent tasks in the
* [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+ * @param ioExecutor for blocking io operations
* @param instanceManager Instance manager to manage the registered
* [[org.apache.flink.runtime.taskmanager.TaskManager]]
* @param scheduler Scheduler to schedule Flink jobs
@@ -59,7 +60,8 @@ import scala.language.postfixOps
*/
class YarnJobManager(
flinkConfiguration: FlinkConfiguration,
- executor: Executor,
+ futureExecutor: Executor,
+ ioExecutor: Executor,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
libraryCacheManager: BlobLibraryCacheManager,
@@ -74,7 +76,8 @@ class YarnJobManager(
metricsRegistry: Option[MetricRegistry])
extends JobManager(
flinkConfiguration,
- executor,
+ futureExecutor,
+ ioExecutor,
instanceManager,
scheduler,
libraryCacheManager,