You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:01 UTC
[42/52] [abbrv] flink git commit: [FLINK-5254] [yarn] Implement YARN
High-Availability Services
http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
new file mode 100644
index 0000000..7aa481f
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for the high availability services for Flink YARN applications that support
+ * no master fail over.
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up.
+ */
+public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServices {
+
+ /** The constant name of the ResourceManager RPC endpoint */
+ protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates new YARN high-availability services, configuring the file system and recovery
+ * data directory based on the working directory in the given Hadoop configuration.
+ *
+ * <p>This class requires that the default Hadoop file system configured in the given
+ * Hadoop configuration is an HDFS.
+ *
+ * @param config The Flink configuration of this component / process.
+ * @param hadoopConf The Hadoop configuration for the YARN cluster.
+ *
+ * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
+ */
+ protected AbstractYarnNonHaServices(
+ Configuration config,
+ org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+ super(config, hadoopConf);
+ }
+
+ // ------------------------------------------------------------------------
+ // Names
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String getResourceManagerEndpointName() {
+ return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public RunningJobsRegistry getRunningJobsRegistry() throws IOException {
+ enter();
+ try {
+ // IMPORTANT: The registry must NOT place its data in a directory that is
+ // cleaned up by these services.
+ return new FsNegativeRunningJobsRegistry(flinkFileSystem, workingDirectory);
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+ enter();
+ try {
+ return new StandaloneCheckpointRecoveryFactory();
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ throw new UnsupportedOperationException("These High-Availability Services do not support storing job graphs");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
new file mode 100644
index 0000000..4c78726
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The basis of {@link HighAvailabilityServices} for YARN setups.
+ * These high-availability services auto-configure YARN's HDFS and the YARN application's
+ * working directory to be used to store job recovery data.
+ *
+ * <p>Note for implementers: This class locks access to and creation of services,
+ * to make sure all services are properly shut down when shutting down this class.
+ * To participate in the checks, overriding methods should frame method body with
+ * calls to {@code enter()} and {@code exit()} as shown in the following pattern:
+ *
+ * <pre>{@code
+ * public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ * enter();
+ * try {
+ * CuratorClient client = getCuratorClient();
+ * return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
+ * } finally {
+ * exit();
+ * }
+ * }
+ * }</pre>
+ */
+public abstract class YarnHighAvailabilityServices implements HighAvailabilityServices {
+
+ /** The name of the sub directory in which Flink stores the recovery data */
+ public static final String FLINK_RECOVERY_DATA_DIR = "flink_recovery_data";
+
+ /** Logger for these services, shared with subclasses */
+ protected static final Logger LOG = LoggerFactory.getLogger(YarnHighAvailabilityServices.class);
+
+ // ------------------------------------------------------------------------
+
+ /** The lock that guards all accesses to methods in this class */
+ private final ReentrantLock lock;
+
+ /** The Flink FileSystem object that represent the HDFS used by YARN */
+ protected final FileSystem flinkFileSystem;
+
+ /** The Hadoop FileSystem object that represent the HDFS used by YARN */
+ protected final org.apache.hadoop.fs.FileSystem hadoopFileSystem;
+
+ /** The working directory of this YARN application.
+ * This MUST NOT be deleted when the HA services clean up */
+ protected final Path workingDirectory;
+
+ /** The directory for HA persistent data. This should be deleted when the
+ * HA services clean up */
+ protected final Path haDataDirectory;
+
+ /** Flag marking this instance as shut down */
+ private volatile boolean closed;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates new YARN high-availability services, configuring the file system and recovery
+ * data directory based on the working directory in the given Hadoop configuration.
+ *
+ * <p>This class requires that the default Hadoop file system configured in the given
+ * Hadoop configuration is an HDFS.
+ *
+ * @param config The Flink configuration of this component / process.
+ * @param hadoopConf The Hadoop configuration for the YARN cluster.
+ *
+ * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
+ */
+ protected YarnHighAvailabilityServices(
+ Configuration config,
+ org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+ checkNotNull(config);
+ checkNotNull(hadoopConf);
+
+ this.lock = new ReentrantLock();
+
+ // get and verify the YARN HDFS URI
+ final URI fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConf);
+ if (fsUri.getScheme() == null || !"hdfs".equals(fsUri.getScheme().toLowerCase())) {
+ throw new IOException("Invalid file system found for YarnHighAvailabilityServices: " +
+ "Expected 'hdfs', but found '" + fsUri.getScheme() + "'.");
+ }
+
+ // initialize the Hadoop File System
+ // we go through this special code path here to make sure we get no shared cached
+ // instance of the FileSystem
+ try {
+ final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass =
+ org.apache.hadoop.fs.FileSystem.getFileSystemClass(fsUri.getScheme(), hadoopConf);
+
+ this.hadoopFileSystem = InstantiationUtil.instantiate(fsClass);
+ this.hadoopFileSystem.initialize(fsUri, hadoopConf);
+ }
+ catch (Exception e) {
+ throw new IOException("Cannot instantiate YARN's Hadoop file system for " + fsUri, e);
+ }
+
+ this.flinkFileSystem = new HadoopFileSystem(hadoopConf, hadoopFileSystem);
+
+ this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri());
+ this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR);
+
+ // test the file system, to make sure we fail fast if access does not work
+ try {
+ flinkFileSystem.mkdirs(haDataDirectory);
+ }
+ catch (Exception e) {
+ throw new IOException("Could not create the directory for recovery data in YARN's file system at '"
+ + haDataDirectory + "'.", e);
+ }
+
+ LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory);
+ }
+
+ // ------------------------------------------------------------------------
+ // high availability services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public BlobStore createBlobStore() throws IOException {
+ enter();
+ try {
+ return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
+ } finally {
+ exit();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
+ /**
+ * Checks whether these services have been shut down.
+ *
+ * @return True, if this instance has been shut down, false if it still operational.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() throws Exception {
+ lock.lock();
+ try {
+ // close only once
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ // we do not propagate exceptions here, but only log them
+ try {
+ hadoopFileSystem.close();
+ } catch (Throwable t) {
+ LOG.warn("Error closing Hadoop FileSystem", t);
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void closeAndCleanupAllData() throws Exception {
+ lock.lock();
+ try {
+ checkState(!closed, "YarnHighAvailabilityServices are already closed");
+
+ // we remember exceptions only, then continue cleanup, and re-throw at the end
+ Throwable exception = null;
+
+ // first, we delete all data in Flink's data directory
+ try {
+ flinkFileSystem.delete(haDataDirectory, true);
+ }
+ catch (Throwable t) {
+ exception = t;
+ }
+
+ // now we actually close the services
+ try {
+ close();
+ }
+ catch (Throwable t) {
+ exception = firstOrSuppressed(t, exception);
+ }
+
+ // if some exception occurred, rethrow
+ if (exception != null) {
+ ExceptionUtils.rethrowException(exception, exception.getMessage());
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * To be called at the beginning of every method that creates an HA service. Acquires the lock
+ * and check whether this HighAvailabilityServices instance is shut down.
+ */
+ void enter() {
+ if (!enterUnlessClosed()) {
+ throw new IllegalStateException("closed");
+ }
+ }
+
+ /**
+ * Acquires the lock and checks whether the services are already closed. If they are
+ * already closed, the method releases the lock and returns {@code false}.
+ *
+ * @return True, if the lock was acquired and the services are not closed, false if the services are closed.
+ */
+ boolean enterUnlessClosed() {
+ lock.lock();
+ if (!closed) {
+ return true;
+ } else {
+ lock.unlock();
+ return false;
+ }
+ }
+
+ /**
+ * To be called at the end of every method that creates an HA service. Releases the lock.
+ */
+ void exit() {
+ lock.unlock();
+ }
+
+ // ------------------------------------------------------------------------
+ // Factory from Configuration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates the high-availability services for a single-job Flink YARN application, to be
+ * used in the Application Master that runs both ResourceManager and JobManager.
+ *
+ * @param flinkConfig The Flink configuration.
+ * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+ *
+ * @return The created high-availability services.
+ *
+ * @throws IOException Thrown, if the high-availability services could not be initialized.
+ */
+ public static YarnHighAvailabilityServices forSingleJobAppMaster(
+ Configuration flinkConfig,
+ org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
+
+ checkNotNull(flinkConfig, "flinkConfig");
+ checkNotNull(hadoopConfig, "hadoopConfig");
+
+ final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
+ switch (mode) {
+ case NONE:
+ return new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+
+ case ZOOKEEPER:
+ throw new UnsupportedOperationException("to be implemented");
+
+ default:
+ throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+ }
+ }
+
+ /**
+ * Creates the high-availability services for the TaskManagers participating in
+ * a Flink YARN application.
+ *
+ * @param flinkConfig The Flink configuration.
+ * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+ *
+ * @return The created high-availability services.
+ *
+ * @throws IOException Thrown, if the high-availability services could not be initialized.
+ */
+ public static YarnHighAvailabilityServices forYarnTaskManager(
+ Configuration flinkConfig,
+ org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
+
+ checkNotNull(flinkConfig, "flinkConfig");
+ checkNotNull(hadoopConfig, "hadoopConfig");
+
+ final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
+ switch (mode) {
+ case NONE:
+ return new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+ case ZOOKEEPER:
+ throw new UnsupportedOperationException("to be implemented");
+
+ default:
+ throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
new file mode 100644
index 0000000..fd1a45e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
+import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * These YarnHighAvailabilityServices are for the Application Master in setups where there is one
+ * ResourceManager that is statically configured in the Flink configuration.
+ *
+ * <h3>Handled failure types</h3>
+ * <ul>
+ * <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
+ * <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are
+ * recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ * <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers
+ * have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up.
+ *
+ * <p>Because ResourceManager and JobManager run both in the same process (Application Master), they
+ * use an embedded leader election service to find each other.
+ *
+ * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
+
+ /** The dispatcher thread pool for these services */
+ private final ExecutorService dispatcher;
+
+ /** The embedded leader election service used by JobManagers to find the resource manager */
+ private final SingleLeaderElectionService resourceManagerLeaderElectionService;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates new YarnIntraNonHaMasterServices for the given Flink and YARN configuration.
+ *
+ * This constructor initializes access to the HDFS to store recovery data, and creates the
+ * embedded leader election services through which ResourceManager and JobManager find and
+ * confirm each other.
+ *
+ * @param config The Flink configuration of this component / process.
+ * @param hadoopConf The Hadoop configuration for the YARN cluster.
+ *
+ * @throws IOException
+ * Thrown, if the initialization of the Hadoop file system used by YARN fails.
+ * @throws IllegalConfigurationException
+ * Thrown, if the Flink configuration does not properly describe the ResourceManager address and port.
+ */
+ public YarnIntraNonHaMasterServices(
+ Configuration config,
+ org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+ super(config, hadoopConf);
+
+ // track whether we successfully perform the initialization
+ boolean successful = false;
+
+ try {
+ this.dispatcher = Executors.newSingleThreadExecutor(new ServicesThreadFactory());
+ this.resourceManagerLeaderElectionService = new SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID);
+
+ // all good!
+ successful = true;
+ }
+ finally {
+ if (!successful) {
+ // quietly undo what the parent constructor initialized
+ try {
+ super.close();
+ } catch (Throwable ignored) {}
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ enter();
+ try {
+ return resourceManagerLeaderElectionService.createLeaderRetrievalService();
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
+ enter();
+ try {
+ return resourceManagerLeaderElectionService;
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+ enter();
+ try {
+ throw new UnsupportedOperationException("needs refactoring to accept default address");
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+ enter();
+ try {
+ throw new UnsupportedOperationException("needs refactoring to accept default address");
+ }
+ finally {
+ exit();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void close() throws Exception {
+ if (enterUnlessClosed()) {
+ try {
+ try {
+ // this class' own cleanup logic
+ resourceManagerLeaderElectionService.shutdown();
+ dispatcher.shutdownNow();
+ }
+ finally {
+ // in any case must we call the parent cleanup logic
+ super.close();
+ }
+ }
+ finally {
+ exit();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
new file mode 100644
index 0000000..eb4b77e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import java.io.IOException;
+
+/**
+ * These YarnHighAvailabilityServices are for use by the TaskManager in setups,
+ * where there is one ResourceManager that is statically configured in the Flink configuration.
+ *
+ * <h3>Handled failure types</h3>
+ * <ul>
+ * <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
+ * <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are
+ * recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ * <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers
+ * have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up.
+ *
+ * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServices {
+
+ /** The RPC URL under which the single ResourceManager can be reached while available */
+ private final String resourceManagerRpcUrl;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates new YarnPreConfiguredMasterHaServices for the given Flink and YARN configuration.
+ * This constructor parses the ResourceManager address from the Flink configuration and sets
+ * up the HDFS access to store recovery data in the YARN application's working directory.
+ *
+ * @param config The Flink configuration of this component / process.
+ * @param hadoopConf The Hadoop configuration for the YARN cluster.
+ *
+ * @throws IOException
+ * Thrown, if the initialization of the Hadoop file system used by YARN fails.
+ * @throws IllegalConfigurationException
+ * Thrown, if the Flink configuration does not properly describe the ResourceManager address and port.
+ */
+ public YarnPreConfiguredMasterNonHaServices(
+ Configuration config,
+ org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+ super(config, hadoopConf);
+
+ // track whether we successfully perform the initialization
+ boolean successful = false;
+
+ try {
+ // extract the hostname and port of the resource manager
+ final String rmHost = config.getString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS);
+ final int rmPort = config.getInteger(YarnConfigOptions.APP_MASTER_RPC_PORT);
+
+ if (rmHost == null) {
+ throw new IllegalConfigurationException("Config parameter '" +
+ YarnConfigOptions.APP_MASTER_RPC_ADDRESS.key() + "' is missing.");
+ }
+ if (rmPort < 0) {
+ throw new IllegalConfigurationException("Config parameter '" +
+ YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' is missing.");
+ }
+ if (rmPort <= 0 || rmPort >= 65536) {
+ throw new IllegalConfigurationException("Invalid value for '" +
+ YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' - port must be in [1, 65535]");
+ }
+
+ this.resourceManagerRpcUrl = RpcServiceUtils.getRpcUrl(
+ rmHost, rmPort, RESOURCE_MANAGER_RPC_ENDPOINT_NAME, config);
+
+ // all well!
+ successful = true;
+ }
+ finally {
+ if (!successful) {
+ // quietly undo what the parent constructor initialized
+ try {
+ super.close();
+ } catch (Throwable ignored) {}
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ enter();
+ try {
+ return new StandaloneLeaderRetrievalService(resourceManagerRpcUrl, DEFAULT_LEADER_ID);
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
+ enter();
+ try {
+ throw new UnsupportedOperationException("Not supported on the TaskManager side");
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+ enter();
+ try {
+ throw new UnsupportedOperationException("needs refactoring to accept default address");
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+ enter();
+ try {
+ throw new UnsupportedOperationException("needs refactoring to accept default address");
+ }
+ finally {
+ exit();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
new file mode 100644
index 0000000..0e7bf0f
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class YarnIntraNonHaMasterServicesTest {
+
+ private static final Random RND = new Random();
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+ private static MiniDFSCluster HDFS_CLUSTER;
+
+ private static Path HDFS_ROOT_PATH;
+
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ // ------------------------------------------------------------------------
+ // Test setup and shutdown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ final File tempDir = TEMP_DIR.newFolder();
+
+ org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ HDFS_CLUSTER = builder.build();
+ HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (HDFS_CLUSTER != null) {
+ HDFS_CLUSTER.shutdown();
+ }
+ HDFS_CLUSTER = null;
+ HDFS_ROOT_PATH = null;
+ }
+
+ @Before
+ public void initConfig() {
+ hadoopConfig = new org.apache.hadoop.conf.Configuration();
+ hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString());
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testRepeatedClose() throws Exception {
+ final Configuration flinkConfig = new Configuration();
+
+ final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+ services.closeAndCleanupAllData();
+
+ // this should not throw an exception
+ services.close();
+ }
+
+ @Test
+ public void testClosingReportsToLeader() throws Exception {
+ final Configuration flinkConfig = new Configuration();
+
+ try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
+ final LeaderElectionService elector = services.getResourceManagerLeaderElectionService();
+ final LeaderContender contender = mockContender(elector);
+
+ elector.start(contender);
+ services.close();
+
+ verify(contender, timeout(100).times(1)).handleError(any(Exception.class));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static LeaderContender mockContender(final LeaderElectionService service) {
+ String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+ return mockContender(service, address);
+ }
+
+ private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+ LeaderContender mockContender = mock(LeaderContender.class);
+
+ when(mockContender.getAddress()).thenReturn(address);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final UUID uuid = (UUID) invocation.getArguments()[0];
+ service.confirmLeaderSessionID(uuid);
+ return null;
+ }
+ }).when(mockContender).grantLeadership(any(UUID.class));
+
+ return mockContender;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
new file mode 100644
index 0000000..a13deac
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.*;
+
+
+public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+ private static MiniDFSCluster HDFS_CLUSTER;
+
+ private static Path HDFS_ROOT_PATH;
+
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ // ------------------------------------------------------------------------
+ // Test setup and shutdown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ final File tempDir = TEMP_DIR.newFolder();
+
+ org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ HDFS_CLUSTER = builder.build();
+ HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (HDFS_CLUSTER != null) {
+ HDFS_CLUSTER.shutdown();
+ }
+ HDFS_CLUSTER = null;
+ HDFS_ROOT_PATH = null;
+ }
+
+ @Before
+ public void initConfig() {
+ hadoopConfig = new org.apache.hadoop.conf.Configuration();
+ hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString());
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testConstantResourceManagerName() throws Exception {
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+ flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+ YarnHighAvailabilityServices services1 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+ YarnHighAvailabilityServices services2 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+ try {
+ String rmName1 = services1.getResourceManagerEndpointName();
+ String rmName2 = services2.getResourceManagerEndpointName();
+
+ assertNotNull(rmName1);
+ assertNotNull(rmName2);
+ assertEquals(rmName1, rmName2);
+ }
+ finally {
+ services1.closeAndCleanupAllData();
+ services2.closeAndCleanupAllData();
+ }
+ }
+
+ @Test
+ public void testMissingRmConfiguration() throws Exception {
+ final Configuration flinkConfig = new Configuration();
+
+ // missing resource manager address
+ try {
+ new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+ fail();
+ } catch (IllegalConfigurationException e) {
+ // expected
+ }
+
+ flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+
+ // missing resource manager port
+ try {
+ new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+ fail();
+ } catch (IllegalConfigurationException e) {
+ // expected
+ }
+
+ flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+ // now everything is good ;-)
+ new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig).closeAndCleanupAllData();
+ }
+
+ @Test
+ public void testCloseAndCleanup() throws Exception {
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+ flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+ // create the services
+ YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+ services.closeAndCleanupAllData();
+
+ final FileSystem fileSystem = HDFS_ROOT_PATH.getFileSystem();
+ final Path workDir = new Path(HDFS_CLUSTER.getFileSystem().getWorkingDirectory().toString());
+
+ try {
+ fileSystem.getFileStatus(new Path(workDir, YarnHighAvailabilityServices.FLINK_RECOVERY_DATA_DIR));
+ fail("Flink recovery data directory still exists");
+ }
+ catch (FileNotFoundException e) {
+ // expected, because the directory should have been cleaned up
+ }
+
+ assertTrue(services.isClosed());
+
+ // doing another cleanup when the services are closed should fail
+ try {
+ services.closeAndCleanupAllData();
+ fail("should fail with an IllegalStateException");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCallsOnClosedServices() throws Exception {
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+ flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+ YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+ // this method is not supported
+ try {
+ services.getSubmittedJobGraphStore();
+ fail();
+ } catch (UnsupportedOperationException ignored) {}
+
+
+ services.close();
+
+ // all these methods should fail now
+
+ try {
+ services.createBlobStore();
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ try {
+ services.getCheckpointRecoveryFactory();
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ try {
+ services.getJobManagerLeaderElectionService(new JobID());
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ try {
+ services.getJobManagerLeaderRetriever(new JobID());
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ try {
+ services.getRunningJobsRegistry();
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ try {
+ services.getResourceManagerLeaderElectionService();
+ fail();
+ } catch (IllegalStateException ignored) {}
+
+ try {
+ services.getResourceManagerLeaderRetriever();
+ fail();
+ } catch (IllegalStateException ignored) {}
+ }
+}