You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/13 20:29:44 UTC
[2/2] flink git commit: [FLINK-5254] [yarn] Implement YARN
High-Availability Services
[FLINK-5254] [yarn] Implement YARN High-Availability Services
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a2d4b36
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a2d4b36
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a2d4b36
Branch: refs/heads/flip-6
Commit: 1a2d4b36809bfa0bc6c993a4f4cb4eda34f82ee2
Parents: 8c448e8
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 5 01:34:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 13 13:53:46 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/Configuration.java | 12 +-
.../FsNegativeRunningJobsRegistryTest.java | 121 ++++++
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 20 +-
.../highavailability/EmbeddedNonHaServices.java | 15 +-
.../FsNegativeRunningJobsRegistry.java | 153 ++++++++
.../HighAvailabilityServices.java | 69 +++-
.../runtime/highavailability/NonHaServices.java | 21 +-
.../highavailability/ServicesThreadFactory.java | 40 ++
.../highavailability/ZookeeperHaServices.java | 17 +-
.../SingleLeaderElectionService.java | 384 +++++++++++++++++++
.../nonha/AbstractNonHaServices.java | 29 +-
.../nonha/EmbeddedLeaderService.java | 2 +-
.../StandaloneLeaderRetrievalService.java | 1 +
.../flink/runtime/minicluster/MiniCluster.java | 2 +-
.../resourcemanager/JobLeaderIdService.java | 2 +-
.../resourcemanager/ResourceManagerRunner.java | 3 +-
.../flink/runtime/rpc/RpcServiceUtils.java | 70 ++++
.../FsNegativeRunningJobsRegistryTest.java | 85 ++++
.../TestingHighAvailabilityServices.java | 14 +-
.../SingleLeaderElectionServiceTest.java | 226 +++++++++++
flink-yarn/pom.xml | 53 ++-
.../flink/yarn/YarnApplicationMasterRunner.java | 2 +-
.../yarn/YarnFlinkApplicationMasterRunner.java | 18 +-
.../flink/yarn/YarnTaskExecutorRunner.java | 2 +-
.../yarn/configuration/YarnConfigOptions.java | 49 +++
.../AbstractYarnNonHaServices.java | 105 +++++
.../YarnHighAvailabilityServices.java | 343 +++++++++++++++++
.../YarnIntraNonHaMasterServices.java | 188 +++++++++
.../YarnPreConfiguredMasterNonHaServices.java | 172 +++++++++
.../YarnIntraNonHaMasterServicesTest.java | 149 +++++++
.../YarnPreConfiguredMasterHaServicesTest.java | 234 +++++++++++
31 files changed, 2518 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index f15c669..8f23435 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -44,7 +44,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
implements IOReadableWritable, java.io.Serializable, Cloneable {
private static final long serialVersionUID = 1L;
-
+
private static final byte TYPE_STRING = 0;
private static final byte TYPE_INT = 1;
private static final byte TYPE_LONG = 2;
@@ -52,14 +52,14 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
private static final byte TYPE_FLOAT = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_BYTES = 6;
-
+
/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
-
+
/** Stores the concrete key/value pairs of this configuration object. */
protected final HashMap<String, Object> confData;
-
+
// --------------------------------------------------------------------------------------------
/**
@@ -639,12 +639,16 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
Object o = getRawValue(configOption.key());
if (o != null) {
+ // found a value for the current proper key
return o;
}
else if (configOption.hasDeprecatedKeys()) {
+ // try the deprecated keys
for (String deprecatedKey : configOption.deprecatedKeys()) {
Object oo = getRawValue(deprecatedKey);
if (oo != null) {
+ LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+ deprecatedKey, configOption.key());
return oo;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..40d75e8
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+ private static MiniDFSCluster HDFS_CLUSTER;
+
+ private static Path HDFS_ROOT_PATH;
+
+ // ------------------------------------------------------------------------
+ // startup / shutdown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ final File tempDir = TEMP_DIR.newFolder();
+
+ Configuration hdConf = new Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ HDFS_CLUSTER = builder.build();
+
+ HDFS_ROOT_PATH = new Path("hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
+ + HDFS_CLUSTER.getNameNodePort() + "/");
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (HDFS_CLUSTER != null) {
+ HDFS_CLUSTER.shutdown();
+ }
+ HDFS_CLUSTER = null;
+ HDFS_ROOT_PATH = null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCreateAndSetFinished() throws Exception {
+ final Path workDir = new Path(HDFS_ROOT_PATH, "test-work-dir");
+ final JobID jid = new JobID();
+
+ FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+ // initially, without any call, the job is considered running
+ assertTrue(registry.isJobRunning(jid));
+
+ // repeated setting should not affect the status
+ registry.setJobRunning(jid);
+ assertTrue(registry.isJobRunning(jid));
+
+ // set the job to finished and validate
+ registry.setJobFinished(jid);
+ assertFalse(registry.isJobRunning(jid));
+
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+ assertFalse(otherRegistry.isJobRunning(jid));
+ }
+
+ @Test
+ public void testSetFinishedAndRunning() throws Exception {
+ final Path workDir = new Path(HDFS_ROOT_PATH, "�nother_w�rk_direct�r�");
+ final JobID jid = new JobID();
+
+ FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+ // set the job to finished and validate
+ registry.setJobFinished(jid);
+ assertFalse(registry.isJobRunning(jid));
+
+ // set the job to back to running and validate
+ registry.setJobRunning(jid);
+ assertTrue(registry.isJobRunning(jid));
+
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+ assertTrue(otherRegistry.isJobRunning(jid));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 5d7173b..8024c54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
* class is a wrapper class which encapsulated the original Hadoop HDFS API.
@@ -62,7 +64,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
/**
- * Creates a new DistributedFileSystem object to access HDFS
+ * Creates a new DistributedFileSystem object to access HDFS, based on a class name
+ * and picking up the configuration from the class path or the Flink configuration.
*
* @throws IOException
* throw if the required HDFS classes cannot be instantiated
@@ -78,6 +81,21 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
this.fs = instantiateFileSystem(fsClass);
}
+ /**
+ * Creates a new DistributedFileSystem that uses the given Hadoop
+ * {@link org.apache.hadoop.fs.FileSystem} under the hood.
+ *
+ * @param hadoopConfig The Hadoop configuration that the FileSystem is based on.
+ * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+ */
+ public HadoopFileSystem(
+ org.apache.hadoop.conf.Configuration hadoopConfig,
+ org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+
+ this.conf = checkNotNull(hadoopConfig, "hadoopConfig");
+ this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+ }
+
private Class<? extends org.apache.hadoop.fs.FileSystem> getDefaultHDFSClass() throws IOException {
Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index b91cec1..a417599 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,6 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
// ------------------------------------------------------------------------
@Override
+ public String getResourceManagerEndpointName() {
+ // dynamic actor name
+ return null;
+ }
+
+ @Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return resourceManagerLeaderService.createLeaderRetrievalService();
}
@@ -55,11 +61,16 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
// ------------------------------------------------------------------------
@Override
- public void shutdown() throws Exception {
+ public void close() throws Exception {
try {
- super.shutdown();
+ super.close();
} finally {
resourceManagerLeaderService.shutdown();
}
}
+
+ @Override
+ public void closeAndCleanupAllData() throws Exception {
+ close();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
new file mode 100644
index 0000000..9d8b226
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
+ * marking finished jobs via marker files.
+ *
+ * <p>The general contract is the following:
+ * <ul>
+ * <li>Initially, a marker file does not exist (no one created it, yet), which means
+ * the specific job is assumed to be running</li>
+ * <li>The JobManager that finishes calls this service to create the marker file,
+ * which marks the job as finished.</li>
+ * <li>If a JobManager gains leadership at some point when shutdown is in progress,
+ * it will see the marker file and realize that the job is finished.</li>
+ * <li>The application framework is expected to clean the file once the application
+ * is completely shut down. At that point, no JobManager will attempt to
+ * start the job, even if it gains leadership.</li>
+ * </ul>
+ *
+ * <p>It is especially tailored towards deployment modes like for example
+ * YARN, where HDFS is available as a persistent file system, and the YARN
+ * application's working directories on HDFS are automatically cleaned
+ * up after the application completed.
+ */
+public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
+
+ private static final String PREFIX = ".job_complete_";
+
+ private final FileSystem fileSystem;
+
+ private final Path basePath;
+
+ /**
+ * Creates a new registry that writes to the FileSystem and working directory
+ * denoted by the given path.
+ *
+ * <p>The initialization will attempt to write to the given working directory, in
+ * order to catch setup/configuration errors early.
+ *
+ * @param workingDirectory The working directory for files to track the job status.
+ *
+ * @throws IOException Thrown, if the specified directory cannot be accessed.
+ */
+ public FsNegativeRunningJobsRegistry(Path workingDirectory) throws IOException {
+ this(workingDirectory.getFileSystem(), workingDirectory);
+ }
+
+ /**
+ * Creates a new registry that writes its files to the given FileSystem at
+ * the given working directory path.
+ *
+ * <p>The initialization will attempt to write to the given working directory, in
+ * order to catch setup/configuration errors early.
+ *
+ * @param fileSystem The FileSystem to use for the marker files.
+ * @param workingDirectory The working directory for files to track the job status.
+ *
+ * @throws IOException Thrown, if the specified directory cannot be accessed.
+ */
+ public FsNegativeRunningJobsRegistry(FileSystem fileSystem, Path workingDirectory) throws IOException {
+ this.fileSystem = checkNotNull(fileSystem, "fileSystem");
+ this.basePath = checkNotNull(workingDirectory, "workingDirectory");
+
+ // to be safe, attempt to write to the working directory, to
+ // catch problems early
+ final Path testFile = new Path(workingDirectory, ".registry_test");
+ try (FSDataOutputStream out = fileSystem.create(testFile, false)) {
+ out.write(42);
+ }
+ catch (IOException e) {
+ throw new IOException("Unable to write to working directory: " + workingDirectory, e);
+ }
+ finally {
+ fileSystem.delete(testFile, false);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void setJobRunning(JobID jobID) throws IOException {
+ checkNotNull(jobID, "jobID");
+ final Path filePath = createMarkerFilePath(jobID);
+
+ // delete the marker file, if it exists
+ try {
+ fileSystem.delete(filePath, false);
+ }
+ catch (FileNotFoundException e) {
+ // apparently job was already considered running
+ }
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) throws IOException {
+ checkNotNull(jobID, "jobID");
+ final Path filePath = createMarkerFilePath(jobID);
+
+ // create the file
+ // to avoid an exception if the job already exists, set overwrite=true
+ try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
+ out.write(42);
+ }
+ }
+
+ @Override
+ public boolean isJobRunning(JobID jobID) throws IOException {
+ checkNotNull(jobID, "jobID");
+
+ // check for the existence of the file
+ try {
+ fileSystem.getFileStatus(createMarkerFilePath(jobID));
+ // file was found --> job is terminated
+ return false;
+ }
+ catch (FileNotFoundException e) {
+ // file does not exist, job is still running
+ return true;
+ }
+ }
+
+ private Path createMarkerFilePath(JobID jobId) {
+ return new Path(basePath, PREFIX + jobId.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 360de7b..4169204 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -26,19 +26,45 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import java.io.IOException;
+import java.util.UUID;
/**
- * This class gives access to all services needed for
- *
+ * The HighAvailabilityServices give access to all services needed for a highly-available
+ * setup. In particular, the services provide access to highly available storage and
+ * registries, as well as distributed counters and leader election.
+ *
* <ul>
* <li>ResourceManager leader election and leader retrieval</li>
* <li>JobManager leader election and leader retrieval</li>
* <li>Persistence for checkpoint metadata</li>
* <li>Registering the latest completed checkpoint(s)</li>
- * <li>Persistence for submitted job graph</li>
+ * <li>Persistence for the BLOB store</li>
+ * <li>Registry that marks a job's status</li>
+ * <li>Naming of RPC endpoints</li>
* </ul>
*/
-public interface HighAvailabilityServices {
+public interface HighAvailabilityServices extends AutoCloseable {
+
+ // ------------------------------------------------------------------------
+ // Constants
+ // ------------------------------------------------------------------------
+
+ /**
+ * This UUID should be used when no proper leader election happens, but a simple
+ * pre-configured leader is used. That is for example the case in non-highly-available
+ * standalone setups.
+ */
+ UUID DEFAULT_LEADER_ID = new UUID(0, 0);
+
+ // ------------------------------------------------------------------------
+ // Endpoint Naming
+ // ------------------------------------------------------------------------
+
+ String getResourceManagerEndpointName();
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
/**
* Gets the leader retriever for the cluster's resource manager.
@@ -88,7 +114,7 @@ public interface HighAvailabilityServices {
*
* @return Running job registry to retrieve running jobs
*/
- RunningJobsRegistry getRunningJobsRegistry();
+ RunningJobsRegistry getRunningJobsRegistry() throws Exception;
/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
@@ -99,11 +125,38 @@ public interface HighAvailabilityServices {
BlobStore createBlobStore() throws IOException;
// ------------------------------------------------------------------------
+ // Shutdown and Cleanup
+ // ------------------------------------------------------------------------
/**
- * Shut the high availability service down.
+ * Closes the high availability services, releasing all resources.
+ *
+ * <p>This method <b>does not delete or clean up</b> any data stored in external stores
+ * (file systems, ZooKeeper, etc). Another instance of the high availability
+ * services will be able to recover the job.
+ *
+ * <p>If an exception occurs during closing services, this method will attempt to
+ * continue closing other services and report exceptions only after all services
+ * have been attempted to be closed.
*
- * @throws Exception if the shut down fails
+ * @throws Exception Thrown, if an exception occurred while closing these services.
+ */
+ @Override
+ void close() throws Exception;
+
+ /**
+ * Closes the high availability services (releasing all resources) and deletes
+ * all data stored by these services in external stores.
+ *
+ * <p>After this method was called, the any job or session that was managed by
+ * these high availability services will be unrecoverable.
+ *
+ * <p>If an exception occurs during cleanup, this method will attempt to
+ * continue the cleanup and report exceptions only after all cleanup steps have
+ * been attempted.
+ *
+ * @throws Exception Thrown, if an exception occurred while closing these services
+ * or cleaning up data stored by them.
*/
- void shutdown() throws Exception;
+ void closeAndCleanupAllData() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 75f44ed..d644fb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,14 +18,12 @@
package org.apache.flink.runtime.highavailability;
+import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import java.util.UUID;
-
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -39,6 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
+ /** The constant name of the ResourceManager RPC endpoint */
+ private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
/** The fix address of the ResourceManager */
private final String resourceManagerAddress;
@@ -53,16 +54,26 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
}
// ------------------------------------------------------------------------
+ // Names
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String getResourceManagerEndpointName() {
+ return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+ }
+
+
+ // ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
- return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
+ return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
- return new StandaloneLeaderElectionService();
+ return new SingleLeaderElectionService(getExecutorService(), DEFAULT_LEADER_ID);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
new file mode 100644
index 0000000..24667e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ServicesThreadFactory implements ThreadFactory {
+
+ private AtomicInteger enumerator = new AtomicInteger();
+
+ @Override
+ public Thread newThread(@Nonnull Runnable r) {
+ Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
+
+ // HA threads should have a very high priority, but not
+ // keep the JVM running by themselves
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.setDaemon(true);
+
+ return thread;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 07c5011..bf0b970 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -107,6 +107,16 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
// ------------------------------------------------------------------------
@Override
+ public String getResourceManagerEndpointName() {
+ // since the resource manager name must be dynamic, we return null here
+ return null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@@ -173,10 +183,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
// ------------------------------------------------------------------------
@Override
- public void shutdown() throws Exception {
+ public void close() throws Exception {
client.close();
}
+ @Override
+ public void closeAndCleanupAllData() throws Exception {
+ close();
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
new file mode 100644
index 0000000..26e3cbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the {@link LeaderElectionService} interface that handles a single
+ * leader contender. When started, this service immediately grants the contender the leadership.
+ *
+ * <p>The implementation accepts a single static leader session ID and is hence compatible with
+ * pre-configured single leader (no leader failover) setups.
+ *
+ * <p>This implementation supports a series of leader listeners that receive notifications about
+ * the leader contender.
+ */
+public class SingleLeaderElectionService implements LeaderElectionService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SingleLeaderElectionService.class);
+
+ // ------------------------------------------------------------------------
+
+ /** lock for all operations on this instance */
+ private final Object lock = new Object();
+
+ /** The executor service that dispatches notifications */
+ private final Executor notificationExecutor;
+
+ /** The leader ID assigned to the immediate leader */
+ private final UUID leaderId;
+
+ @GuardedBy("lock")
+ private final HashSet<EmbeddedLeaderRetrievalService> listeners;
+
+ /** The currently proposed leader */
+ @GuardedBy("lock")
+ private volatile LeaderContender proposedLeader;
+
+ /** The confirmed leader */
+ @GuardedBy("lock")
+ private volatile LeaderContender leader;
+
+ /** The address of the confirmed leader */
+ @GuardedBy("lock")
+ private volatile String leaderAddress;
+
+ /** Flag marking this service as shutdown, meaning it cannot be started again */
+ @GuardedBy("lock")
+ private volatile boolean shutdown;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new leader election service. The service assigns the given leader ID
+ * to the leader contender.
+ *
+ * @param leaderId The constant leader ID assigned to the leader.
+ */
+ public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) {
+ this.notificationExecutor = checkNotNull(notificationsDispatcher);
+ this.leaderId = checkNotNull(leaderId);
+ this.listeners = new HashSet<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // leader election service
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void start(LeaderContender contender) throws Exception {
+ checkNotNull(contender, "contender");
+
+ synchronized (lock) {
+ checkState(!shutdown, "service is shut down");
+ checkState(proposedLeader == null, "service already started");
+
+ // directly grant leadership to the given contender
+ proposedLeader = contender;
+ notificationExecutor.execute(new GrantLeadershipCall(contender, leaderId));
+ }
+ }
+
+ @Override
+ public void stop() {
+ synchronized (lock) {
+ // notify all listeners that there is no leader
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+ }
+
+ // if there was a leader, revoke its leadership
+ if (leader != null) {
+ try {
+ leader.revokeLeadership();
+ } catch (Throwable t) {
+ leader.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+
+ proposedLeader = null;
+ leader = null;
+ leaderAddress = null;
+ }
+ }
+
+ @Override
+ public void confirmLeaderSessionID(UUID leaderSessionID) {
+ checkNotNull(leaderSessionID, "leaderSessionID");
+ checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id");
+
+ synchronized (lock) {
+ checkState(!shutdown, "service is shut down");
+ checkState(proposedLeader != null, "no leader proposed yet");
+ checkState(leader == null, "leader already confirmed");
+
+ // accept the confirmation
+ final String address = proposedLeader.getAddress();
+ leaderAddress = address;
+ leader = proposedLeader;
+
+ // notify all listeners
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasLeadership() {
+ synchronized (lock) {
+ return leader != null;
+ }
+ }
+
+ void errorOnGrantLeadership(LeaderContender contender, Throwable error) {
+ LOG.warn("Error notifying leader listener about new leader", error);
+ contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error));
+
+ synchronized (lock) {
+ if (proposedLeader == contender) {
+ proposedLeader = null;
+ leader = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown
+ // ------------------------------------------------------------------------
+
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ public void shutdown() {
+ shutdownInternally(new Exception("The leader service is shutting down"));
+ }
+
+ private void shutdownInternally(Exception exceptionForHandlers) {
+ synchronized (lock) {
+ if (shutdown) {
+ return;
+ }
+
+ shutdown = true;
+
+ // fail the leader (if there is one)
+ if (leader != null) {
+ try {
+ leader.handleError(exceptionForHandlers);
+ } catch (Throwable ignored) {}
+ }
+
+ // clear all leader status
+ leader = null;
+ proposedLeader = null;
+ leaderAddress = null;
+
+ // fail all registered listeners
+ for (EmbeddedLeaderRetrievalService service : listeners) {
+ service.shutdown(exceptionForHandlers);
+ }
+ listeners.clear();
+ }
+ }
+
+ private void fatalError(Throwable error) {
+ LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+ shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+ }
+
+ // ------------------------------------------------------------------------
+ // leader listeners
+ // ------------------------------------------------------------------------
+
+ public LeaderRetrievalService createLeaderRetrievalService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderRetrievalService();
+ }
+
+ void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader retrieval service is already started");
+
+ try {
+ if (!listeners.add(service)) {
+ throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+ }
+
+ service.listener = listener;
+ service.running = true;
+
+ // if we already have a leader, immediately notify this new listener
+ if (leader != null) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG));
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ void removeListener(EmbeddedLeaderRetrievalService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!listeners.remove(service)) {
+ throw new IllegalStateException("leader retrieval service does not belong to this service");
+ }
+
+ // stop the service
+ service.listener = null;
+ service.running = false;
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+ volatile LeaderRetrievalListener listener;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderRetrievalListener listener) throws Exception {
+ checkNotNull(listener);
+ addListener(this, listener);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeListener(this);
+ }
+
+ void shutdown(Exception cause) {
+ if (running) {
+ final LeaderRetrievalListener lst = listener;
+ running = false;
+ listener = null;
+
+ try {
+ lst.handleError(cause);
+ } catch (Throwable ignored) {}
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // asynchronous notifications
+ // ------------------------------------------------------------------------
+
+ /**
+ * This runnable informs a leader contender that it gained leadership.
+ */
+ private class GrantLeadershipCall implements Runnable {
+
+ private final LeaderContender contender;
+ private final UUID leaderSessionId;
+
+ GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId) {
+
+ this.contender = checkNotNull(contender);
+ this.leaderSessionId = checkNotNull(leaderSessionId);
+ }
+
+ @Override
+ public void run() {
+ try {
+ contender.grantLeadership(leaderSessionId);
+ }
+ catch (Throwable t) {
+ errorOnGrantLeadership(contender, t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This runnable informs a leader listener of a new leader
+ */
+ private static class NotifyOfLeaderCall implements Runnable {
+
+ @Nullable
+ private final String address; // null if leader revoked without new leader
+ @Nullable
+ private final UUID leaderSessionId; // null if leader revoked without new leader
+
+ private final LeaderRetrievalListener listener;
+ private final Logger logger;
+
+ NotifyOfLeaderCall(
+ @Nullable String address,
+ @Nullable UUID leaderSessionId,
+ LeaderRetrievalListener listener,
+ Logger logger) {
+
+ this.address = address;
+ this.leaderSessionId = leaderSessionId;
+ this.listener = checkNotNull(listener);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ listener.notifyLeaderAddress(address, leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error notifying leader listener about new leader", t);
+ listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 474faa8..b10e414 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -25,19 +25,17 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -132,7 +130,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
// ------------------------------------------------------------------------
@Override
- public void shutdown() throws Exception {
+ public void close() throws Exception {
synchronized (lock) {
if (!shutdown) {
shutdown = true;
@@ -149,6 +147,12 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
}
}
+ @Override
+ public void closeAndCleanupAllData() throws Exception {
+ // this stores no data, so this method is the same as 'close()'
+ close();
+ }
+
private void checkNotShutdown() {
checkState(!shutdown, "high availability services are shut down");
}
@@ -160,21 +164,4 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
protected ExecutorService getExecutorService() {
return executor;
}
-
- private static final class ServicesThreadFactory implements ThreadFactory {
-
- private AtomicInteger enumerator = new AtomicInteger();
-
- @Override
- public Thread newThread(@Nonnull Runnable r) {
- Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
-
- // HA threads should have a very high priority, but not
- // keep the JVM running by themselves
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.setDaemon(true);
-
- return thread;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 9fad9be..d4eba26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -461,7 +461,7 @@ public class EmbeddedLeaderService {
contender.grantLeadership(leaderSessionId);
}
catch (Throwable t) {
- logger.warn("Error notifying leader listener about new leader", t);
+ logger.warn("Error granting leadership to contender", t);
contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 16b163c..4ad4646 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -51,6 +51,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService
*
* @param leaderAddress The leader's pre-configured address
*/
+ @Deprecated
public StandaloneLeaderRetrievalService(String leaderAddress) {
this.leaderAddress = checkNotNull(leaderAddress);
this.leaderId = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 29a6e59..1933554 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -345,7 +345,7 @@ public class MiniCluster {
// shut down high-availability services
if (haServices != null) {
try {
- haServices.shutdown();
+ haServices.closeAndCleanupAllData();
} catch (Exception e) {
exception = firstOrSuppressed(e, exception);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 56e72c0..6c7e249 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -59,7 +59,7 @@ public class JobLeaderIdService {
/** Actions to call when the job leader changes */
private JobLeaderIdActions jobLeaderIdActions;
- public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) {
+ public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception {
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 959b727..e0dee0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -46,7 +45,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices highAvailabilityServices,
- final MetricRegistry metricRegistry) throws ConfigurationException {
+ final MetricRegistry metricRegistry) throws Exception {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(rpcService);
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index 1ac54ac..018c3ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -21,19 +21,36 @@ package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.NetUtils;
+
import org.jboss.netty.channel.ChannelException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.UnknownHostException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * These RPC utilities contain helper methods around RPC use, such as starting an RPC service,
+ * or constructing RPC addresses.
+ */
public class RpcServiceUtils {
+
private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
+ // ------------------------------------------------------------------------
+ // RPC instantiation
+ // ------------------------------------------------------------------------
+
/**
* Utility method to create RPC service from configuration and hostname, port.
*
@@ -78,4 +95,57 @@ public class RpcServiceUtils {
final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
return new AkkaRpcService(actorSystem, timeout);
}
+
+ // ------------------------------------------------------------------------
+ // RPC endpoint addressing
+ // ------------------------------------------------------------------------
+
+ /**
+ *
+ * @param hostname The hostname or address where the target RPC service is listening.
+ * @param port The port where the target RPC service is listening.
+ * @param endpointName The name of the RPC endpoint.
+ * @param config Teh configuration from which to deduce further settings.
+ *
+ * @return The RPC URL of the specified RPC endpoint.
+ */
+ public static String getRpcUrl(String hostname, int port, String endpointName, Configuration config)
+ throws UnknownHostException {
+
+ checkNotNull(config, "config is null");
+
+ final boolean sslEnabled = config.getBoolean(
+ ConfigConstants.AKKA_SSL_ENABLED,
+ ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+ SSLUtils.getSSLEnabled(config);
+
+ return getRpcUrl(hostname, port, endpointName, sslEnabled);
+ }
+
+ /**
+ *
+ * @param hostname The hostname or address where the target RPC service is listening.
+ * @param port The port where the target RPC service is listening.
+ * @param endpointName The name of the RPC endpoint.
+ * @param secure True, if security/encryption is enabled, false otherwise.
+ *
+ * @return The RPC URL of the specified RPC endpoint.
+ */
+ public static String getRpcUrl(String hostname, int port, String endpointName, boolean secure)
+ throws UnknownHostException {
+
+ checkNotNull(hostname, "hostname is null");
+ checkNotNull(endpointName, "endpointName is null");
+ checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
+
+ final String protocol = secure ? "akka.ssl.tcp" : "akka.tcp";
+ final String hostPort = NetUtils.hostAndPortToUrlString(hostname, port);
+
+ return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class is not meant to be instantiated */
+ private RpcServiceUtils() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..f1ece0e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest extends TestLogger {
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testCreateAndSetFinished() throws Exception {
+ final File folder = tempFolder.newFolder();
+ final String uri = folder.toURI().toString();
+
+ final JobID jid = new JobID();
+
+ FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
+ // initially, without any call, the job is considered running
+ assertTrue(registry.isJobRunning(jid));
+
+ // repeated setting should not affect the status
+ registry.setJobRunning(jid);
+ assertTrue(registry.isJobRunning(jid));
+
+ // set the job to finished and validate
+ registry.setJobFinished(jid);
+ assertFalse(registry.isJobRunning(jid));
+
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+ assertFalse(otherRegistry.isJobRunning(jid));
+ }
+
+ @Test
+ public void testSetFinishedAndRunning() throws Exception {
+ final File folder = tempFolder.newFolder();
+ final String uri = folder.toURI().toString();
+
+ final JobID jid = new JobID();
+
+ FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri));
+
+ // set the job to finished and validate
+ registry.setJobFinished(jid);
+ assertFalse(registry.isJobRunning(jid));
+
+ // set the job to back to running and validate
+ registry.setJobRunning(jid);
+ assertTrue(registry.isJobRunning(jid));
+
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri));
+ assertTrue(otherRegistry.isJobRunning(jid));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index e0f71ee..3f9865c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -155,12 +155,22 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
return new VoidBlobStore();
}
+ @Override
+ public String getResourceManagerEndpointName() {
+ throw new UnsupportedOperationException();
+ }
+
// ------------------------------------------------------------------------
// Shutdown
// ------------------------------------------------------------------------
@Override
- public void shutdown() throws Exception {
- // nothing to do, since this should not shut down individual services, but cross service parts
+ public void close() throws Exception {
+ // nothing to do
+ }
+
+ @Override
+ public void closeAndCleanupAllData() throws Exception {
+ // nothing to do
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
new file mode 100644
index 0000000..a9805a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link SingleLeaderElectionService}.
+ */
+public class SingleLeaderElectionServiceTest {
+
+ private static final Random RND = new Random();
+
+ private final Executor executor = Executors.directExecutor();
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testStartStopAssignLeadership() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ final LeaderContender contender = mockContender(service);
+ final LeaderContender otherContender = mockContender(service);
+
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ service.stop();
+ verify(contender, times(1)).revokeLeadership();
+
+ // start with a new contender - the old contender must not gain another leadership
+ service.start(otherContender);
+ verify(otherContender, times(1)).grantLeadership(uuid);
+
+ verify(contender, times(1)).grantLeadership(uuid);
+ verify(contender, times(1)).revokeLeadership();
+ }
+
+ @Test
+ public void testStopBeforeConfirmingLeadership() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ final LeaderContender contender = mock(LeaderContender.class);
+
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ service.stop();
+
+ // because the leadership was never confirmed, there is no "revoke" call
+ verifyNoMoreInteractions(contender);
+ }
+
+ @Test
+ public void testStartOnlyOnce() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ final LeaderContender contender = mock(LeaderContender.class);
+ final LeaderContender otherContender = mock(LeaderContender.class);
+
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ // should not be possible to start again this with another contender
+ try {
+ service.start(otherContender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // should not be possible to start this again with the same contender
+ try {
+ service.start(contender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+
+ // create a leader contender and let it grab leadership
+ final LeaderContender contender = mockContender(service);
+ service.start(contender);
+ verify(contender, times(1)).grantLeadership(uuid);
+
+ // some leader listeners
+ final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+ final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+
+ LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+ LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+
+ listenerService1.start(listener1);
+ listenerService2.start(listener2);
+
+ // one listener stops
+ listenerService1.stop();
+
+ // shut down the service
+ service.shutdown();
+
+ // the leader contender and running listener should get error notifications
+ verify(contender, times(1)).handleError(any(Exception.class));
+ verify(listener2, times(1)).handleError(any(Exception.class));
+
+ // the stopped listener gets no notification
+ verify(listener1, times(0)).handleError(any(Exception.class));
+
+ // should not be possible to start again after shutdown
+ try {
+ service.start(contender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // no additional leadership grant
+ verify(contender, times(1)).grantLeadership(any(UUID.class));
+ }
+
+ @Test
+ public void testImmediateShutdown() throws Exception {
+ final UUID uuid = UUID.randomUUID();
+ final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+ service.shutdown();
+
+ final LeaderContender contender = mock(LeaderContender.class);
+
+ // should not be possible to start
+ try {
+ service.start(contender);
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // no additional leadership grant
+ verify(contender, times(0)).grantLeadership(any(UUID.class));
+ }
+
+// @Test
+// public void testNotifyListenersWhenLeaderElected() throws Exception {
+// final UUID uuid = UUID.randomUUID();
+// final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid);
+//
+// final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class);
+// final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class);
+//
+// LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService();
+// LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService();
+//
+// listenerService1.start(listener1);
+// listenerService1.start(listener2);
+//
+// final LeaderContender contender = mockContender(service);
+// service.start(contender);
+//
+// veri
+// }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static LeaderContender mockContender(final LeaderElectionService service) {
+ String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z');
+ return mockContender(service, address);
+ }
+
+ private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
+ LeaderContender mockContender = mock(LeaderContender.class);
+
+ when(mockContender.getAddress()).thenReturn(address);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final UUID uuid = (UUID) invocation.getArguments()[0];
+ service.confirmLeaderSessionID(uuid);
+ return null;
+ }
+ }).when(mockContender).grantLeadership(any(UUID.class));
+
+ return mockContender;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 9ce57d5..7eb220d 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -32,16 +32,13 @@ under the License.
<packaging>jar</packaging>
<dependencies>
+
+ <!-- core dependencies -->
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>hadoop-core</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
@@ -57,46 +54,48 @@ under the License.
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
</dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor_${scala.binary.version}</artifactId>
- </dependency>
+ <!-- test dependencies -->
<dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote_${scala.binary.version}</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-camel_${scala.binary.version}</artifactId>
+ <artifactId>akka-testkit_${scala.binary.version}</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-testkit_${scala.binary.version}</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
<scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 8e3418c..61208c4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -108,7 +108,7 @@ public class YarnApplicationMasterRunner {
* @param args The command line arguments.
*/
public static void main(String[] args) {
- EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args);
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index e58c77e..188d9ef 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -57,7 +56,12 @@ import java.io.FileInputStream;
import java.io.ObjectInputStream;
/**
- * This class is the executable entry point for the YARN application master.
+ * This class is the executable entry point for the YARN Application Master that
+ * executes a single Flink job and then shuts the YARN application down.
+ *
+ * <p>The lifetime of the YARN application bound to that of the Flink job. Other
+ * YARN Application Master implementations are for example the YARN session.
+ *
* It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner}
* and {@link org.apache.flink.yarn.YarnResourceManager}.
*
@@ -74,6 +78,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
/** The job graph file path */
private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+ // ------------------------------------------------------------------------
+
/** The lock to guard startup / shutdown / manipulation methods */
private final Object lock = new Object();
@@ -105,7 +111,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
* @param args The command line arguments.
*/
public static void main(String[] args) {
- EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args);
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
@@ -127,7 +133,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
synchronized (lock) {
+ LOG.info("Starting High Availability Services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+
metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
commonRpcService = createRpcService(config, appMasterHostname, amPortRange);
@@ -176,7 +184,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
}
- private ResourceManager createResourceManager(Configuration config) throws ConfigurationException {
+ private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices);
@@ -242,7 +250,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
}
if (haServices != null) {
try {
- haServices.shutdown();
+ haServices.close();
} catch (Throwable tt) {
LOG.warn("Failed to stop the HA service", tt);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index d9912eb..7808152 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -240,7 +240,7 @@ public class YarnTaskExecutorRunner {
}
if (haServices != null) {
try {
- haServices.shutdown();
+ haServices.close();
} catch (Throwable tt) {
LOG.warn("Failed to stop the HA service", tt);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
new file mode 100644
index 0000000..c3902d3
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by Flink's YARN runners.
+ * These options are not expected to be ever configured by users explicitly.
+ */
+public class YarnConfigOptions {
+
+ /**
+ * The hostname or address where the application master RPC system is listening.
+ */
+ public static final ConfigOption<String> APP_MASTER_RPC_ADDRESS =
+ key("yarn.appmaster.rpc.address")
+ .noDefaultValue();
+
+ /**
+ * The port where the application master RPC system is listening.
+ */
+ public static final ConfigOption<Integer> APP_MASTER_RPC_PORT =
+ key("yarn.appmaster.rpc.address")
+ .defaultValue(-1);
+
+ // ------------------------------------------------------------------------
+
+ /** This class is not meant to be instantiated */
+ private YarnConfigOptions() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a2d4b36/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
new file mode 100644
index 0000000..7aa481f
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for the high availability services for Flink YARN applications that support
+ * no master fail over.
+ *
+ * <p>Internally, these services put their recovery data into YARN's working directory,
+ * except for checkpoints, which are in the configured checkpoint directory. That way,
+ * checkpoints can be resumed with a new job/application, even if the complete YARN application
+ * is killed and cleaned up.
+ */
+public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServices {
+
+ /** The constant name of the ResourceManager RPC endpoint */
+ protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates new YARN high-availability services, configuring the file system and recovery
+ * data directory based on the working directory in the given Hadoop configuration.
+ *
+ * <p>This class requires that the default Hadoop file system configured in the given
+ * Hadoop configuration is an HDFS.
+ *
+ * @param config The Flink configuration of this component / process.
+ * @param hadoopConf The Hadoop configuration for the YARN cluster.
+ *
+ * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails.
+ */
+ protected AbstractYarnNonHaServices(
+ Configuration config,
+ org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
+ super(config, hadoopConf);
+ }
+
+ // ------------------------------------------------------------------------
+ // Names
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String getResourceManagerEndpointName() {
+ return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public RunningJobsRegistry getRunningJobsRegistry() throws IOException {
+ enter();
+ try {
+ // IMPORTANT: The registry must NOT place its data in a directory that is
+ // cleaned up by these services.
+ return new FsNegativeRunningJobsRegistry(flinkFileSystem, workingDirectory);
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+ enter();
+ try {
+ return new StandaloneCheckpointRecoveryFactory();
+ }
+ finally {
+ exit();
+ }
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ throw new UnsupportedOperationException("These High-Availability Services do not support storing job graphs");
+ }
+}