You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:01 UTC

[42/52] [abbrv] flink git commit: [FLINK-5254] [yarn] Implement YARN High-Availability Services

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
new file mode 100644
index 0000000..7aa481f
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for the high availability services for Flink YARN applications that support
+ * no master fail over.
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up.
+ */
+public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServices {
+
+	/** The constant name of the ResourceManager RPC endpoint */
+	protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YARN high-availability services, configuring the file system and recovery
+	 * data directory based on the working directory in the given Hadoop configuration.
+	 *
+	 * <p>This class requires that the default Hadoop file system configured in the given
+	 * Hadoop configuration is an HDFS.
+	 *
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 *
+	 * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 */
+	protected AbstractYarnNonHaServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+		super(config, hadoopConf);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Names
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getResourceManagerEndpointName() {
+		return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() throws IOException {
+		enter();
+		try {
+			// IMPORTANT: The registry must NOT place its data in a directory that is
+			// cleaned up by these services.
+			return new FsNegativeRunningJobsRegistry(flinkFileSystem, workingDirectory);
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+		enter();
+		try {
+			return new StandaloneCheckpointRecoveryFactory();
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		throw new UnsupportedOperationException("These High-Availability Services do not support storing job graphs");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
new file mode 100644
index 0000000..4c78726
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The basis of {@link HighAvailabilityServices} for YARN setups.
+ * These high-availability services auto-configure YARN's HDFS and the YARN application's
+ * working directory to be used to store job recovery data.
+ * 
+ * <p>Note for implementers: This class locks access to and creation of services,
+ * to make sure all services are properly shut down when shutting down this class.
+ * To participate in the checks, overriding methods should frame method body with
+ * calls to {@code enter()} and {@code exit()} as shown in the following pattern:
+ * 
+ * <pre>{@code
+ * public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ *     enter();
+ *     try {
+ *         CuratorClient client = getCuratorClient();
+ *         return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
+ *     } finally {
+ *         exit();
+ *     }
+ * }
+ * }</pre>
+ */
+public abstract class YarnHighAvailabilityServices implements HighAvailabilityServices {
+
+	/** The name of the sub directory in which Flink stores the recovery data */
+	public static final String FLINK_RECOVERY_DATA_DIR = "flink_recovery_data";
+
+	/** Logger for these services, shared with subclasses */
+	protected static final Logger LOG = LoggerFactory.getLogger(YarnHighAvailabilityServices.class);
+
+	// ------------------------------------------------------------------------
+
+	/** The lock that guards all accesses to methods in this class */
+	private final ReentrantLock lock;
+
+	/** The Flink FileSystem object that represent the HDFS used by YARN */
+	protected final FileSystem flinkFileSystem;
+
+	/** The Hadoop FileSystem object that represent the HDFS used by YARN */
+	protected final org.apache.hadoop.fs.FileSystem hadoopFileSystem;
+
+	/** The working directory of this YARN application.
+	 * This MUST NOT be deleted when the HA services clean up */
+	protected final Path workingDirectory;
+
+	/** The directory for HA persistent data. This should be deleted when the
+	 * HA services clean up */
+	protected final Path haDataDirectory;
+
+	/** Flag marking this instance as shut down */
+	private volatile boolean closed;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YARN high-availability services, configuring the file system and recovery
+	 * data directory based on the working directory in the given Hadoop configuration.
+	 * 
+	 * <p>This class requires that the default Hadoop file system configured in the given
+	 * Hadoop configuration is an HDFS.
+	 * 
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 * 
+	 * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 */
+	protected YarnHighAvailabilityServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+		checkNotNull(config);
+		checkNotNull(hadoopConf);
+
+		this.lock = new ReentrantLock();
+
+		// get and verify the YARN HDFS URI
+		final URI fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConf);
+		if (fsUri.getScheme() == null || !"hdfs".equals(fsUri.getScheme().toLowerCase())) {
+			throw new IOException("Invalid file system found for YarnHighAvailabilityServices: " +
+					"Expected 'hdfs', but found '" + fsUri.getScheme() + "'.");
+		}
+
+		// initialize the Hadoop File System
+		// we go through this special code path here to make sure we get no shared cached
+		// instance of the FileSystem
+		try {
+			final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass =
+					org.apache.hadoop.fs.FileSystem.getFileSystemClass(fsUri.getScheme(), hadoopConf);
+
+			this.hadoopFileSystem = InstantiationUtil.instantiate(fsClass);
+			this.hadoopFileSystem.initialize(fsUri, hadoopConf);
+		}
+		catch (Exception e) {
+			throw new IOException("Cannot instantiate YARN's Hadoop file system for " + fsUri, e);
+		}
+
+		this.flinkFileSystem = new HadoopFileSystem(hadoopConf, hadoopFileSystem);
+
+		this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri());
+		this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR);
+
+		// test the file system, to make sure we fail fast if access does not work
+		try {
+			flinkFileSystem.mkdirs(haDataDirectory);
+		}
+		catch (Exception e) {
+			throw new IOException("Could not create the directory for recovery data in YARN's file system at '"
+					+ haDataDirectory + "'.", e);
+		}
+
+		LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  high availability services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public BlobStore createBlobStore() throws IOException {
+		enter();
+		try {
+			return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
+		} finally {
+			exit();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks whether these services have been shut down.
+	 *
+	 * @return True, if this instance has been shut down, false if it still operational.
+	 */
+	public boolean isClosed() {
+		return closed;
+	}
+
+	@Override
+	public void close() throws Exception {
+		lock.lock();
+		try {
+			// close only once
+			if (closed) {
+				return;
+			}
+			closed = true;
+
+			// we do not propagate exceptions here, but only log them
+			try {
+				hadoopFileSystem.close();
+			} catch (Throwable t) {
+				LOG.warn("Error closing Hadoop FileSystem", t);
+			}
+		}
+		finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		lock.lock();
+		try {
+			checkState(!closed, "YarnHighAvailabilityServices are already closed");
+
+			// we remember exceptions only, then continue cleanup, and re-throw at the end
+			Throwable exception = null;
+
+			// first, we delete all data in Flink's data directory
+			try {
+				flinkFileSystem.delete(haDataDirectory, true);
+			}
+			catch (Throwable t) {
+				exception = t;
+			}
+
+			// now we actually close the services
+			try {
+				close();
+			}
+			catch (Throwable t) {
+				exception = firstOrSuppressed(t, exception);
+			}
+
+			// if some exception occurred, rethrow
+			if (exception != null) {
+				ExceptionUtils.rethrowException(exception, exception.getMessage());
+			}
+		}
+		finally {
+			lock.unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * To be called at the beginning of every method that creates an HA service. Acquires the lock
+	 * and check whether this HighAvailabilityServices instance is shut down.
+	 */
+	void enter() {
+		if (!enterUnlessClosed()) {
+			throw new IllegalStateException("closed");
+		}
+	}
+
+	/**
+	 * Acquires the lock and checks whether the services are already closed. If they are
+	 * already closed, the method releases the lock and returns {@code false}.
+	 * 
+	 * @return True, if the lock was acquired and the services are not closed, false if the services are closed.
+	 */
+	boolean enterUnlessClosed() {
+		lock.lock();
+		if (!closed) {
+			return true;
+		} else {
+			lock.unlock();
+			return false;
+		}
+	}
+
+	/**
+	 * To be called at the end of every method that creates an HA service. Releases the lock.
+	 */
+	void exit() {
+		lock.unlock();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factory from Configuration
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates the high-availability services for a single-job Flink YARN application, to be
+	 * used in the Application Master that runs both ResourceManager and JobManager.
+	 * 
+	 * @param flinkConfig  The Flink configuration.
+	 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+	 * 
+	 * @return The created high-availability services.
+	 * 
+	 * @throws IOException Thrown, if the high-availability services could not be initialized.
+	 */
+	public static YarnHighAvailabilityServices forSingleJobAppMaster(
+			Configuration flinkConfig,
+			org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
+
+		checkNotNull(flinkConfig, "flinkConfig");
+		checkNotNull(hadoopConfig, "hadoopConfig");
+
+		final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
+		switch (mode) {
+			case NONE:
+				return new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+
+			case ZOOKEEPER:
+				throw  new UnsupportedOperationException("to be implemented");
+
+			default:
+				throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+		}
+	}
+
+	/**
+	 * Creates the high-availability services for the TaskManagers participating in
+	 * a Flink YARN application.
+	 *
+	 * @param flinkConfig  The Flink configuration.
+	 * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+	 *
+	 * @return The created high-availability services.
+	 *
+	 * @throws IOException Thrown, if the high-availability services could not be initialized.
+	 */
+	public static YarnHighAvailabilityServices forYarnTaskManager(
+			Configuration flinkConfig,
+			org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException {
+
+		checkNotNull(flinkConfig, "flinkConfig");
+		checkNotNull(hadoopConfig, "hadoopConfig");
+
+		final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig);
+		switch (mode) {
+			case NONE:
+				return new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+			case ZOOKEEPER:
+				throw  new UnsupportedOperationException("to be implemented");
+
+			default:
+				throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
new file mode 100644
index 0000000..fd1a45e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
+import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * These YarnHighAvailabilityServices are for the Application Master in setups where there is one
+ * ResourceManager that is statically configured in the Flink configuration.
+ * 
+ * <h3>Handled failure types</h3>
+ * <ul>
+ *     <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
+ *     <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are
+ *         recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ *     <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers
+ *     have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up. 
+ *
+ * <p>Because ResourceManager and JobManager run both in the same process (Application Master), they
+ * use an embedded leader election service to find each other.
+ * 
+ * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
+
+	/** The dispatcher thread pool for these services */
+	private final ExecutorService dispatcher;
+
+	/** The embedded leader election service used by JobManagers to find the resource manager */
+	private final SingleLeaderElectionService resourceManagerLeaderElectionService;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YarnIntraNonHaMasterServices for the given Flink and YARN configuration.
+	 * 
+	 * This constructor initializes access to the HDFS to store recovery data, and creates the
+	 * embedded leader election services through which ResourceManager and JobManager find and
+	 * confirm each other.
+	 * 
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 *
+	 * @throws IOException
+	 *             Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 * @throws IllegalConfigurationException
+	 *             Thrown, if the Flink configuration does not properly describe the ResourceManager address and port.
+	 */
+	public YarnIntraNonHaMasterServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+		super(config, hadoopConf);
+
+		// track whether we successfully perform the initialization
+		boolean successful = false;
+
+		try {
+			this.dispatcher = Executors.newSingleThreadExecutor(new ServicesThreadFactory());
+			this.resourceManagerLeaderElectionService = new SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID);
+
+			// all good!
+			successful = true;
+		}
+		finally {
+			if (!successful) {
+				// quietly undo what the parent constructor initialized
+				try {
+					super.close();
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		enter();
+		try {
+			return resourceManagerLeaderElectionService.createLeaderRetrievalService();
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		enter();
+		try {
+			return resourceManagerLeaderElectionService;
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void close() throws Exception {
+		if (enterUnlessClosed()) {
+			try {
+				try {
+					// this class' own cleanup logic
+					resourceManagerLeaderElectionService.shutdown();
+					dispatcher.shutdownNow();
+				}
+				finally {
+					// in any case must we call the parent cleanup logic
+					super.close();
+				}
+			}
+			finally {
+				exit();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
new file mode 100644
index 0000000..eb4b77e
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import java.io.IOException;
+
+/**
+ * These YarnHighAvailabilityServices are for use by the TaskManager in setups,
+ * where there is one ResourceManager that is statically configured in the Flink configuration.
+ * 
+ * <h3>Handled failure types</h3>
+ * <ul>
+ *     <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li>
+ *     <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are
+ *         recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ *     <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers
+ *     have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up. 
+ *
+ * <p>A typical YARN setup that uses these HA services first starts the ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServices {
+
+	/** The RPC URL under which the single ResourceManager can be reached while available */ 
+	private final String resourceManagerRpcUrl;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates new YarnPreConfiguredMasterHaServices for the given Flink and YARN configuration.
+	 * This constructor parses the ResourceManager address from the Flink configuration and sets
+	 * up the HDFS access to store recovery data in the YARN application's working directory.
+	 * 
+	 * @param config     The Flink configuration of this component / process.
+	 * @param hadoopConf The Hadoop configuration for the YARN cluster.
+	 *
+	 * @throws IOException
+	 *             Thrown, if the initialization of the Hadoop file system used by YARN fails.
+	 * @throws IllegalConfigurationException
+	 *             Thrown, if the Flink configuration does not properly describe the ResourceManager address and port.
+	 */
+	public YarnPreConfiguredMasterNonHaServices(
+			Configuration config,
+			org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+
+		super(config, hadoopConf);
+
+		// track whether we successfully perform the initialization
+		boolean successful = false;
+
+		try {
+			// extract the hostname and port of the resource manager
+			final String rmHost = config.getString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS);
+			final int rmPort = config.getInteger(YarnConfigOptions.APP_MASTER_RPC_PORT);
+
+			if (rmHost == null) {
+				throw new IllegalConfigurationException("Config parameter '" + 
+						YarnConfigOptions.APP_MASTER_RPC_ADDRESS.key() + "' is missing.");
+			}
+			if (rmPort < 0) {
+				throw new IllegalConfigurationException("Config parameter '" +
+						YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' is missing.");
+			}
+			if (rmPort <= 0 || rmPort >= 65536) {
+				throw new IllegalConfigurationException("Invalid value for '" + 
+						YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' - port must be in [1, 65535]");
+			}
+
+			this.resourceManagerRpcUrl = RpcServiceUtils.getRpcUrl(
+					rmHost, rmPort, RESOURCE_MANAGER_RPC_ENDPOINT_NAME, config);
+
+			// all well!
+			successful = true;
+		}
+		finally {
+			if (!successful) {
+				// quietly undo what the parent constructor initialized
+				try {
+					super.close();
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		enter();
+		try {
+			return new StandaloneLeaderRetrievalService(resourceManagerRpcUrl, DEFAULT_LEADER_ID);
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		enter();
+		try {
+			throw new UnsupportedOperationException("Not supported on the TaskManager side");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		enter();
+		try {
+			throw new UnsupportedOperationException("needs refactoring to accept default address");
+		}
+		finally {
+			exit();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
new file mode 100644
index 0000000..0e7bf0f
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class YarnIntraNonHaMasterServicesTest {
+
+	private static final Random RND = new Random();
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+	private static MiniDFSCluster HDFS_CLUSTER;
+
+	private static Path HDFS_ROOT_PATH;
+
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	// ------------------------------------------------------------------------
+	//  Test setup and shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File tempDir = TEMP_DIR.newFolder();
+
+		org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		HDFS_CLUSTER = builder.build();
+		HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (HDFS_CLUSTER != null) {
+			HDFS_CLUSTER.shutdown();
+		}
+		HDFS_CLUSTER = null;
+		HDFS_ROOT_PATH = null;
+	}
+
+	@Before
+	public void initConfig() {
+		hadoopConfig = new org.apache.hadoop.conf.Configuration();
+		hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testRepeatedClose() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+
+		final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+		services.closeAndCleanupAllData();
+
+		// this should not throw an exception
+		services.close();
+	}
+
+	@Test
+	public void testClosingReportsToLeader() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+
+		try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
+			final LeaderElectionService elector = services.getResourceManagerLeaderElectionService();
+			final LeaderContender contender = mockContender(elector);
+
+			elector.start(contender);
+			services.close();
+
+			verify(contender, timeout(100).times(1)).handleError(any(Exception.class));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static LeaderContender mockContender(final LeaderElectionService service) {
+		String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+		return mockContender(service, address);
+	}
+
+	private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+		LeaderContender mockContender = mock(LeaderContender.class);
+
+		when(mockContender.getAddress()).thenReturn(address);
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				final UUID uuid = (UUID) invocation.getArguments()[0];
+				service.confirmLeaderSessionID(uuid);
+				return null;
+			}
+		}).when(mockContender).grantLeadership(any(UUID.class));
+
+		return mockContender;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
new file mode 100644
index 0000000..a13deac
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.*;
+
+
+public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+	private static MiniDFSCluster HDFS_CLUSTER;
+
+	private static Path HDFS_ROOT_PATH;
+
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	// ------------------------------------------------------------------------
+	//  Test setup and shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File tempDir = TEMP_DIR.newFolder();
+
+		org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		HDFS_CLUSTER = builder.build();
+		HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (HDFS_CLUSTER != null) {
+			HDFS_CLUSTER.shutdown();
+		}
+		HDFS_CLUSTER = null;
+		HDFS_ROOT_PATH = null;
+	}
+
+	@Before
+	public void initConfig() {
+		hadoopConfig = new org.apache.hadoop.conf.Configuration();
+		hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testConstantResourceManagerName() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		YarnHighAvailabilityServices services1 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+		YarnHighAvailabilityServices services2 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+		try {
+			String rmName1 = services1.getResourceManagerEndpointName();
+			String rmName2 = services2.getResourceManagerEndpointName();
+
+			assertNotNull(rmName1);
+			assertNotNull(rmName2);
+			assertEquals(rmName1, rmName2);
+		}
+		finally {
+			services1.closeAndCleanupAllData();
+			services2.closeAndCleanupAllData();
+		}
+	}
+
+	@Test
+	public void testMissingRmConfiguration() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+
+		// missing resource manager address
+		try {
+			new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+			fail();
+		} catch (IllegalConfigurationException e) {
+			// expected
+		}
+
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+
+		// missing resource manager port
+		try {
+			new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+			fail();
+		} catch (IllegalConfigurationException e) {
+			// expected
+		}
+
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		// now everything is good ;-)
+		new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig).closeAndCleanupAllData();
+	}
+
+	@Test
+	public void testCloseAndCleanup() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		// create the services
+		YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+		services.closeAndCleanupAllData();
+
+		final FileSystem fileSystem = HDFS_ROOT_PATH.getFileSystem();
+		final Path workDir = new Path(HDFS_CLUSTER.getFileSystem().getWorkingDirectory().toString());
+		
+		try {
+			fileSystem.getFileStatus(new Path(workDir, YarnHighAvailabilityServices.FLINK_RECOVERY_DATA_DIR));
+			fail("Flink recovery data directory still exists");
+		}
+		catch (FileNotFoundException e) {
+			// expected, because the directory should have been cleaned up
+		}
+
+		assertTrue(services.isClosed());
+
+		// doing another cleanup when the services are closed should fail
+		try {
+			services.closeAndCleanupAllData();
+			fail("should fail with an IllegalStateException");
+		} catch (IllegalStateException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCallsOnClosedServices() throws Exception {
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
+		flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
+
+		YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+		// this method is not supported
+		try {
+			services.getSubmittedJobGraphStore();
+			fail();
+		} catch (UnsupportedOperationException ignored) {}
+
+
+		services.close();
+
+		// all these methods should fail now
+
+		try {
+			services.createBlobStore();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getCheckpointRecoveryFactory();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getJobManagerLeaderElectionService(new JobID());
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getJobManagerLeaderRetriever(new JobID());
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getRunningJobsRegistry();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getResourceManagerLeaderElectionService();
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		try {
+			services.getResourceManagerLeaderRetriever();
+			fail();
+		} catch (IllegalStateException ignored) {}
+	}
+}