You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:22 UTC
[03/52] [abbrv] flink git commit: [FLINK-4375] [distributed
coordination] Implement new JobManager creation, initialization,
and basic RPC methods
[FLINK-4375] [distributed coordination] Implement new JobManager creation, initialization, and basic RPC methods
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6486067
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6486067
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6486067
Branch: refs/heads/master
Commit: c64860677fb576f00c700a62a4ebb4ed415bebda
Parents: 4e5f423
Author: Kurt Young <yk...@gmail.com>
Authored: Wed Oct 12 23:25:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:22 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/util/StringUtils.java | 14 +
.../apache/flink/runtime/blob/BlobServer.java | 65 +-
.../apache/flink/runtime/blob/BlobStore.java | 3 +-
.../apache/flink/runtime/blob/BlobUtils.java | 8 +-
.../flink/runtime/blob/FileSystemBlobStore.java | 53 +-
.../flink/runtime/blob/VoidBlobStore.java | 2 +-
.../HighAvailabilityServices.java | 13 +
.../runtime/highavailability/NonHaServices.java | 13 +
.../highavailability/RunningJobsRegistry.java | 66 ++
.../highavailability/ZookeeperHaServices.java | 104 ++-
.../highavailability/nonha/NonHaRegistry.java | 62 ++
.../runtime/jobmanager/OnCompletionActions.java | 3 +-
.../runtime/jobmaster/JobManagerRunner.java | 269 ++++--
.../runtime/jobmaster/JobManagerServices.java | 86 +-
.../flink/runtime/jobmaster/JobMaster.java | 831 +++++++------------
.../runtime/jobmaster/JobMasterGateway.java | 112 +--
.../jobmaster/MiniClusterJobDispatcher.java | 61 +-
.../jobmaster/message/ClassloadingProps.java | 1 -
.../message/DisposeSavepointResponse.java | 49 --
.../message/TriggerSavepointResponse.java | 74 --
.../apache/flink/runtime/rpc/RpcService.java | 4 +-
.../taskexecutor/JobManagerConnection.java | 25 +-
.../runtime/taskexecutor/TaskExecutor.java | 103 ++-
.../taskexecutor/rpc/RpcInputSplitProvider.java | 8 +-
.../rpc/RpcPartitionStateChecker.java | 8 +-
.../RpcResultPartitionConsumableNotifier.java | 7 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 6 +
.../TestingHighAvailabilityServices.java | 14 +
.../jobmaster/JobManagerRunnerMockTest.java | 56 +-
.../runtime/jobmaster/JobManagerRunnerTest.java | 24 +
30 files changed, 1184 insertions(+), 960 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 10b6304..3c32d77 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -335,4 +335,18 @@ public final class StringUtils {
return null;
}
}
+
+ public static boolean isNullOrWhitespaceOnly(String str) {
+ if (str == null || str.length() == 0) {
+ return true;
+ }
+
+ final int len = str.length();
+ for (int i = 0; i < len; i++) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index b800500..33f9db7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -22,7 +22,11 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.NetUtils;
@@ -45,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
/**
* This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
@@ -96,9 +101,16 @@ public class BlobServer extends Thread implements BlobService {
* thrown if the BLOB server cannot bind to a free network port
*/
public BlobServer(Configuration config) throws IOException {
- checkNotNull(config, "Configuration");
+ this(config, createBlobStoreFromConfig(config));
+ }
- HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
+ public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException {
+ this(config, haServices.createBlobStore());
+ }
+
+ private BlobServer(Configuration config, BlobStore blobStore) throws IOException {
+ checkNotNull(config);
+ this.blobStore = checkNotNull(blobStore);
this.blobServiceConfiguration = config;
@@ -107,14 +119,6 @@ public class BlobServer extends Thread implements BlobService {
this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", storageDir);
- if (highAvailabilityMode == HighAvailabilityMode.NONE) {
- this.blobStore = new VoidBlobStore();
- } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
- this.blobStore = new FileSystemBlobStore(config);
- } else {
- throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
- }
-
// configure the maximum number of concurrent connections
final int maxConnections = config.getInteger(
ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
@@ -135,13 +139,7 @@ public class BlobServer extends Thread implements BlobService {
backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
}
- if (highAvailabilityMode == HighAvailabilityMode.NONE) {
- // Add shutdown hook to delete storage directory
- this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
- }
- else {
- this.shutdownHook = null;
- }
+ this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
@@ -451,4 +449,37 @@ public class BlobServer extends Thread implements BlobService {
}
}
+ private static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException {
+ HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
+
+ if (highAvailabilityMode == HighAvailabilityMode.NONE) {
+ return new VoidBlobStore();
+ } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+ final String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+ if (isNullOrWhitespaceOnly(storagePath)) {
+ throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+ HighAvailabilityOptions.HA_STORAGE_PATH);
+ }
+
+ final Path path;
+ try {
+ path = new Path(storagePath);
+ } catch (Exception e) {
+ throw new IOException("Invalid path for highly available storage (" +
+ HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+ }
+
+ final FileSystem fileSystem;
+ try {
+ fileSystem = path.getFileSystem();
+ } catch (Exception e) {
+ throw new IOException("Could not create FileSystem for highly available storage (" +
+ HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+ }
+
+ return new FileSystemBlobStore(fileSystem, storagePath);
+ } else {
+ throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 1e72d91..7050338 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -25,7 +25,7 @@ import java.io.File;
/**
* A blob store.
*/
-interface BlobStore {
+public interface BlobStore {
/**
* Copies the local file to the blob store.
@@ -93,5 +93,4 @@ interface BlobStore {
* Cleans up the store and deletes all blobs.
*/
void cleanUp();
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index e74fa6f..136df09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import java.io.EOFException;
@@ -73,7 +74,7 @@ public class BlobUtils {
*/
static File initStorageDirectory(String storageDirectory) {
File baseDir;
- if (storageDirectory == null || storageDirectory.trim().isEmpty()) {
+ if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
baseDir = new File(System.getProperty("java.io.tmpdir"));
}
else {
@@ -81,10 +82,9 @@ public class BlobUtils {
}
File storageDir;
- final int MAX_ATTEMPTS = 10;
- int attempt;
- for(attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+ final int MAX_ATTEMPTS = 10;
+ for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
storageDir = new File(baseDir, String.format(
"blobStore-%s", UUID.randomUUID().toString()));
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index deba738..2c05002 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -20,12 +20,7 @@ package org.apache.flink.runtime.blob;
import com.google.common.io.Files;
-import org.apache.commons.lang3.StringUtils;
-
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.IOUtils;
@@ -38,7 +33,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.URI;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -47,25 +41,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>This is used in addition to the local blob storage for high availability.
*/
-class FileSystemBlobStore implements BlobStore {
+public class FileSystemBlobStore implements BlobStore {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
+ /** The file system in which blobs are stored */
+ private final FileSystem fileSystem;
+
/** The base path of the blob store */
private final String basePath;
- FileSystemBlobStore(Configuration config) throws IOException {
- String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
-
- if (storagePath == null || StringUtils.isBlank(storagePath)) {
- throw new IllegalConfigurationException("Missing high-availability storage path for metadata." +
- " Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
- }
+ public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException {
+ this.fileSystem = checkNotNull(fileSystem);
+ this.basePath = checkNotNull(storagePath) + "/blob";
- this.basePath = storagePath + "/blob";
+ LOG.info("Creating highly available BLOB storage directory at {}", basePath);
- FileSystem.get(new Path(basePath).toUri()).mkdirs(new Path(basePath));
- LOG.info("Created blob directory {}.", basePath);
+ fileSystem.mkdirs(new Path(basePath));
+ LOG.debug("Created highly available BLOB storage directory at {}", basePath);
}
// - Put ------------------------------------------------------------------
@@ -81,9 +74,7 @@ class FileSystemBlobStore implements BlobStore {
}
private void put(File fromFile, String toBlobPath) throws Exception {
- try (OutputStream os = FileSystem.get(new URI(toBlobPath))
- .create(new Path(toBlobPath), true)) {
-
+ try (OutputStream os = fileSystem.create(new Path(toBlobPath), true)) {
LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
Files.copy(fromFile, os);
}
@@ -106,16 +97,15 @@ class FileSystemBlobStore implements BlobStore {
checkNotNull(toFile, "File");
if (!toFile.exists() && !toFile.createNewFile()) {
- throw new IllegalStateException("Failed to create target file to copy to");
+ throw new IOException("Failed to create target file to copy to");
}
- final URI fromUri = new URI(fromBlobPath);
final Path fromPath = new Path(fromBlobPath);
- if (FileSystem.get(fromUri).exists(fromPath)) {
- try (InputStream is = FileSystem.get(fromUri).open(fromPath)) {
- FileOutputStream fos = new FileOutputStream(toFile);
-
+ if (fileSystem.exists(fromPath)) {
+ try (InputStream is = fileSystem.open(fromPath);
+ FileOutputStream fos = new FileOutputStream(toFile))
+ {
LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
IOUtils.copyBytes(is, fos); // closes the streams
}
@@ -145,17 +135,16 @@ class FileSystemBlobStore implements BlobStore {
private void delete(String blobPath) {
try {
LOG.debug("Deleting {}.", blobPath);
-
- FileSystem fs = FileSystem.get(new URI(blobPath));
+
Path path = new Path(blobPath);
- fs.delete(path, true);
+ fileSystem.delete(path, true);
// send a call to delete the directory containing the file. This will
// fail (and be ignored) when some files still exist.
try {
- fs.delete(path.getParent(), false);
- fs.delete(new Path(basePath), false);
+ fileSystem.delete(path.getParent(), false);
+ fileSystem.delete(new Path(basePath), false);
} catch (IOException ignored) {}
}
catch (Exception e) {
@@ -168,7 +157,7 @@ class FileSystemBlobStore implements BlobStore {
try {
LOG.debug("Cleaning up {}.", basePath);
- FileSystem.get(new URI(basePath)).delete(new Path(basePath), true);
+ fileSystem.delete(new Path(basePath), true);
}
catch (Exception e) {
LOG.error("Failed to clean up recovery directory.");
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 1b71add..ece2ac1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -25,7 +25,7 @@ import java.io.File;
/**
* A blob store doing nothing.
*/
-class VoidBlobStore implements BlobStore {
+public class VoidBlobStore implements BlobStore {
@Override
public void put(File localFile, BlobKey blobKey) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index a26886a..5d78ffc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -19,11 +19,14 @@
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import java.io.IOException;
+
/**
* This class gives access to all services needed for
*
@@ -72,4 +75,14 @@ public interface HighAvailabilityServices {
* Gets the submitted job graph store for the job manager
*/
SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
+
+ /**
+ * Gets the registry that holds information about whether jobs are currently running.
+ */
+ RunningJobsRegistry getRunningJobsRegistry() throws Exception;
+
+ /**
+ * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
+ */
+ BlobStore createBlobStore() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 2c6295c..d7fd2bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,8 +19,11 @@
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -102,4 +105,14 @@ public class NonHaServices implements HighAvailabilityServices {
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
return new StandaloneSubmittedJobGraphStore();
}
+
+ @Override
+ public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+ return new NonHaRegistry();
+ }
+
+ @Override
+ public BlobStore createBlobStore() {
+ return new VoidBlobStore();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
new file mode 100644
index 0000000..e7c131c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.IOException;
+
+/**
+ * This registry tracks if a certain job is running.
+ *
+ * <p>This registry is used in highly-available setups with multiple master nodes,
+ * to determine whether a new leader should attempt to recover a certain job (because the
+ * job is still running), or whether the job has already finished successfully (in case of a
+ * finite job) and the leader has only been granted leadership because the previous leader
+ * quit cleanly after the job was finished.
+ */
+public interface RunningJobsRegistry {
+
+ /**
+ * Marks a job as running.
+ *
+ * @param jobID The id of the job.
+ *
+ * @throws IOException Thrown when the communication with the highly-available storage or registry
+ * failed and could not be retried.
+ */
+ void setJobRunning(JobID jobID) throws IOException;
+
+ /**
+ * Marks a job as running.
+ *
+ * @param jobID The id of the job.
+ *
+ * @throws IOException Thrown when the communication with the highly-available storage or registry
+ * failed and could not be retried.
+ */
+ void setJobFinished(JobID jobID) throws IOException;
+
+ /**
+ * Checks whether a job is running.
+ *
+ * @param jobID The id of the job to check.
+ * @return True if the job is still running, false otherwise.
+ *
+ * @throws IOException Thrown when the communication with the highly-available storage or registry
+ * failed and could not be retried.
+ */
+ boolean isJobRunning(JobID jobID) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index d25965d..d32068e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -19,8 +19,15 @@
package org.apache.flink.runtime.highavailability;
import org.apache.curator.framework.CuratorFramework;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
@@ -28,16 +35,57 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
+import java.io.IOException;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
/**
- * An implementation of the {@link HighAvailabilityServices} with zookeeper.
+ * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
+ * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
+ *
+ * <pre>
+ * /flink
+ * +/cluster_id_1/resource_manager_lock
+ * | |
+ * | +/job-id-1/job_manager_lock
+ * | | /checkpoints/latest
+ * | | /latest-1
+ * | | /latest-2
+ * | |
+ * | +/job-id-2/job_manager_lock
+ * |
+ * +/cluster_id_2/resource_manager_lock
+ * |
+ * +/job-id-1/job_manager_lock
+ * |/checkpoints/latest
+ * | /latest-1
+ * |/persisted_job_graph
+ * </pre>
+ *
+ * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
+ * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
+ * accommodate specific permission.
+ *
+ * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster".
+ * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
+ * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
+ *
+ * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
+ * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
+ *
+ * <p>In the case of a standalone cluster, that cluster-id needs to be configured via
+ * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
+ * cluster and participate in the execution of the same set of jobs.
*/
public class ZookeeperHaServices implements HighAvailabilityServices {
- private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = "/resource-manager";
+ private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
+
+ private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
+
+ // ------------------------------------------------------------------------
/** The ZooKeeper client to use */
private final CuratorFramework client;
@@ -54,24 +102,28 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
this.configuration = checkNotNull(configuration);
}
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
- return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+ return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
- return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathSuffixForJob(jobID));
+ return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
- return ZooKeeperUtils.createLeaderElectionService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
- return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathSuffixForJob(jobID));
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
@Override
@@ -84,7 +136,43 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
}
- private static String getPathSuffixForJob(final JobID jobID) {
- return String.format("/job-managers/%s", jobID);
+ @Override
+ public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+ throw new UnsupportedOperationException("not yet implemented");
+ }
+
+ @Override
+ public BlobStore createBlobStore() throws IOException {
+ final String storagePath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+ if (isNullOrWhitespaceOnly(storagePath)) {
+ throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+ HighAvailabilityOptions.HA_STORAGE_PATH);
+ }
+
+ final Path path;
+ try {
+ path = new Path(storagePath);
+ } catch (Exception e) {
+ throw new IOException("Invalid path for highly available storage (" +
+ HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+ }
+
+ final FileSystem fileSystem;
+ try {
+ fileSystem = path.getFileSystem();
+ } catch (Exception e) {
+ throw new IOException("Could not create FileSystem for highly available storage (" +
+ HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+ }
+
+ return new FileSystemBlobStore(fileSystem, storagePath);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static String getPathForJobManager(final JobID jobID) {
+ return "/" + jobID + JOB_MANAGER_LEADER_PATH;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
new file mode 100644
index 0000000..85dd711
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A registry for running jobs, not-highly available.
+ */
+public class NonHaRegistry implements RunningJobsRegistry {
+
+ /** The currently running jobs */
+ private final HashSet<JobID> running = new HashSet<>();
+
+ @Override
+ public void setJobRunning(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (running) {
+ running.add(jobID);
+ }
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (running) {
+ running.remove(jobID);
+ }
+ }
+
+ @Override
+ public boolean isJobRunning(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (running) {
+ return running.contains(jobID);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 6de4253..25a2a66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-public interface OnCompletionActions extends FatalErrorHandler {
+public interface OnCompletionActions {
void jobFinished(JobExecutionResult result);
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index a096932..74ca6f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,26 +21,38 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The runner for the job manager. It deals with job level leader election and make underlying job manager
* properly reacted.
*/
-public class JobManagerRunner implements LeaderContender, OnCompletionActions {
+public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler {
- private final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
+ private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
+
+ // ------------------------------------------------------------------------
/** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously */
private final Object lock = new Object();
@@ -48,52 +60,140 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
/** The job graph needs to run */
private final JobGraph jobGraph;
- private final OnCompletionActions toNotify;
+ /** The listener to notify once the job completes - either successfully or unsuccessfully */
+ private final OnCompletionActions toNotifyOnComplete;
+
+ /** The handler to call in case of fatal (unrecoverable) errors */
+ private final FatalErrorHandler errorHandler;
/** Used to check whether a job needs to be run */
- private final SubmittedJobGraphStore submittedJobGraphStore;
+ private final RunningJobsRegistry runningJobsRegistry;
/** Leader election for this job */
private final LeaderElectionService leaderElectionService;
+ private final JobManagerServices jobManagerServices;
+
private final JobMaster jobManager;
+ private final JobManagerMetricGroup jobManagerMetricGroup;
+
/** flag marking the runner as shut down */
private volatile boolean shutdown;
+ // ------------------------------------------------------------------------
+
public JobManagerRunner(
- final JobGraph jobGraph,
- final Configuration configuration,
- final RpcService rpcService,
- final HighAvailabilityServices haServices,
- final OnCompletionActions toNotify) throws Exception
+ final JobGraph jobGraph,
+ final Configuration configuration,
+ final RpcService rpcService,
+ final HighAvailabilityServices haServices,
+ final OnCompletionActions toNotifyOnComplete,
+ final FatalErrorHandler errorHandler) throws Exception
{
this(jobGraph, configuration, rpcService, haServices,
- JobManagerServices.fromConfiguration(configuration), toNotify);
+ new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
+ toNotifyOnComplete, errorHandler);
}
public JobManagerRunner(
- final JobGraph jobGraph,
- final Configuration configuration,
- final RpcService rpcService,
- final HighAvailabilityServices haServices,
- final JobManagerServices jobManagerServices,
- final OnCompletionActions toNotify) throws Exception
+ final JobGraph jobGraph,
+ final Configuration configuration,
+ final RpcService rpcService,
+ final HighAvailabilityServices haServices,
+ final MetricRegistry metricRegistry,
+ final OnCompletionActions toNotifyOnComplete,
+ final FatalErrorHandler errorHandler) throws Exception
+ {
+ this(jobGraph, configuration, rpcService, haServices,
+ JobManagerServices.fromConfiguration(configuration, haServices),
+ metricRegistry,
+ toNotifyOnComplete, errorHandler);
+ }
+
+ /**
+ *
+ * <p>Exceptions that occur while creating the JobManager or JobManagerRunner are directly
+ * thrown and not reported to the given {@code FatalErrorHandler}.
+ *
+ * <p>This JobManagerRunner assumes that it owns the given {@code JobManagerServices}.
+ * It will shut them down on error and on calls to {@link #shutdown()}.
+ *
+ * @throws Exception Thrown if the runner cannot be set up, because either one of the
+ * required services could not be started, ot the Job could not be initialized.
+ */
+ public JobManagerRunner(
+ final JobGraph jobGraph,
+ final Configuration configuration,
+ final RpcService rpcService,
+ final HighAvailabilityServices haServices,
+ final JobManagerServices jobManagerServices,
+ final MetricRegistry metricRegistry,
+ final OnCompletionActions toNotifyOnComplete,
+ final FatalErrorHandler errorHandler) throws Exception
{
- this.jobGraph = jobGraph;
- this.toNotify = toNotify;
- this.submittedJobGraphStore = haServices.getSubmittedJobGraphStore();
- this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
-
- this.jobManager = new JobMaster(
- jobGraph, configuration, rpcService, haServices,
- jobManagerServices.libraryCacheManager,
- jobManagerServices.restartStrategyFactory,
- jobManagerServices.savepointStore,
- jobManagerServices.timeout,
- new Scheduler(jobManagerServices.executorService),
- jobManagerServices.jobManagerMetricGroup,
- this);
+
+ JobManagerMetricGroup jobManagerMetrics = null;
+
+ // make sure we cleanly shut down out JobManager services if initialization fails
+ try {
+ this.jobGraph = checkNotNull(jobGraph);
+ this.toNotifyOnComplete = checkNotNull(toNotifyOnComplete);
+ this.errorHandler = checkNotNull(errorHandler);
+ this.jobManagerServices = checkNotNull(jobManagerServices);
+
+ checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
+
+ final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
+ jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress);
+ this.jobManagerMetricGroup = jobManagerMetrics;
+
+ // libraries and class loader first
+ final BlobLibraryCacheManager libraryCacheManager = jobManagerServices.libraryCacheManager;
+ try {
+ libraryCacheManager.registerJob(
+ jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
+ } catch (IOException e) {
+ throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
+ }
+
+ final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
+ if (userCodeLoader == null) {
+ throw new Exception("The user code class loader could not be initialized.");
+ }
+
+ // high availability services next
+ this.runningJobsRegistry = haServices.getRunningJobsRegistry();
+ this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
+
+ // now start the JobManager
+ this.jobManager = new JobMaster(
+ jobGraph, configuration,
+ rpcService,
+ haServices,
+ jobManagerServices.executorService,
+ jobManagerServices.libraryCacheManager,
+ jobManagerServices.restartStrategyFactory,
+ jobManagerServices.rpcAskTimeout,
+ jobManagerMetrics,
+ this,
+ this,
+ userCodeLoader);
+ }
+ catch (Throwable t) {
+ // clean up everything
+ try {
+ jobManagerServices.shutdown();
+ } catch (Throwable tt) {
+ log.error("Error while shutting down JobManager services", tt);
+ }
+
+ if (jobManagerMetrics != null) {
+ jobManagerMetrics.close();
+ }
+
+ throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
+ }
}
//----------------------------------------------------------------------------------------------
@@ -101,9 +201,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
//----------------------------------------------------------------------------------------------
public void start() throws Exception {
- jobManager.init();
- jobManager.start();
-
try {
leaderElectionService.start(this);
}
@@ -114,11 +211,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
}
public void shutdown() {
- shutdown(new Exception("The JobManager runner is shutting down"));
- }
-
- public void shutdown(Throwable cause) {
- // TODO what is the cause used for ?
shutdownInternally();
}
@@ -129,12 +221,29 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
if (leaderElectionService != null) {
try {
leaderElectionService.stop();
- } catch (Exception e) {
- log.error("Could not properly shutdown the leader election service.");
+ } catch (Throwable t) {
+ log.error("Could not properly shutdown the leader election service", t);
}
}
- jobManager.shutDown();
+ try {
+ jobManager.shutDown();
+ } catch (Throwable t) {
+ log.error("Error shutting down JobManager", t);
+ }
+
+ try {
+ jobManagerServices.shutdown();
+ } catch (Throwable t) {
+ log.error("Error shutting down JobManager services", t);
+ }
+
+ // make all registered metrics go away
+ try {
+ jobManagerMetricGroup.close();
+ } catch (Throwable t) {
+ log.error("Error while unregistering metrics", t);
+ }
}
}
@@ -148,11 +257,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
@Override
public void jobFinished(JobExecutionResult result) {
try {
+ unregisterJobFromHighAvailability();
shutdownInternally();
}
finally {
- if (toNotify != null) {
- toNotify.jobFinished(result);
+ if (toNotifyOnComplete != null) {
+ toNotifyOnComplete.jobFinished(result);
}
}
}
@@ -163,11 +273,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
@Override
public void jobFailed(Throwable cause) {
try {
+ unregisterJobFromHighAvailability();
shutdownInternally();
}
finally {
- if (toNotify != null) {
- toNotify.jobFailed(cause);
+ if (toNotifyOnComplete != null) {
+ toNotifyOnComplete.jobFailed(cause);
}
}
}
@@ -178,11 +289,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
@Override
public void jobFinishedByOther() {
try {
+ unregisterJobFromHighAvailability();
shutdownInternally();
}
finally {
- if (toNotify != null) {
- toNotify.jobFinishedByOther();
+ if (toNotifyOnComplete != null) {
+ toNotifyOnComplete.jobFinishedByOther();
}
}
}
@@ -192,18 +304,43 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
*/
@Override
public void onFatalError(Throwable exception) {
- // first and in any case, notify our handler, so it can react fast
+ // we log first to make sure an explaining message goes into the log
+ // we even guard the log statement here to increase chances that the error handler
+ // gets the notification on hard critical situations like out-of-memory errors
+ try {
+ log.error("JobManager runner encountered a fatal error.", exception);
+ } catch (Throwable ignored) {}
+
+ // in any case, notify our handler, so it can react fast
try {
- if (toNotify != null) {
- toNotify.onFatalError(exception);
+ if (errorHandler != null) {
+ errorHandler.onFatalError(exception);
}
}
finally {
- log.error("JobManager runner encountered a fatal error.", exception);
+ // the shutdown may not even needed any more, if the fatal error
+ // handler kills the process. that is fine, a process kill cleans up better than anything.
shutdownInternally();
}
}
+ /**
+ * Marks this runner's job as not running. Other JobManager will not recover the job
+ * after this call.
+ *
+ * <p>This method never throws an exception.
+ */
+ private void unregisterJobFromHighAvailability() {
+ try {
+ runningJobsRegistry.setJobFinished(jobGraph.getJobID());
+ }
+ catch (Throwable t) {
+ log.error("Could not un-register from high-availability services job {} ({})." +
+ "Other JobManager's may attempt to recover it and re-execute it.",
+ jobGraph.getName(), jobGraph.getJobID(), t);
+ }
+ }
+
//----------------------------------------------------------------------------------------------
// Leadership methods
//----------------------------------------------------------------------------------------------
@@ -223,15 +360,25 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
// it's okay that job manager wait for the operation complete
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+ boolean jobRunning;
+ try {
+ jobRunning = runningJobsRegistry.isJobRunning(jobGraph.getJobID());
+ } catch (Throwable t) {
+ log.error("Could not access status (running/finished) of job {}. " +
+ "Falling back to assumption that job is running and attempting recovery...",
+ jobGraph.getJobID(), t);
+ jobRunning = true;
+ }
+
// Double check the leadership after we confirm that, there is a small chance that multiple
// job managers schedule the same job after if they try to recover at the same time.
// This will eventually be noticed, but can not be ruled out from the beginning.
if (leaderElectionService.hasLeadership()) {
- if (isJobFinishedByOthers()) {
+ if (jobRunning) {
+ jobManager.start(leaderSessionID);
+ } else {
log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
jobFinishedByOther();
- } else {
- jobManager.getSelf().startJob(leaderSessionID);
}
}
}
@@ -248,7 +395,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
log.info("JobManager for job {} ({}) was revoked leadership at {}.",
jobGraph.getName(), jobGraph.getJobID(), getAddress());
- jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader."));
+ jobManager.getSelf().suspendExecution(new Exception("JobManager is no longer the leader."));
}
}
@@ -263,11 +410,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
onFatalError(exception);
}
- @VisibleForTesting
- boolean isJobFinishedByOthers() {
- // TODO: Fix
- return false;
- }
+ //----------------------------------------------------------------------------------------------
+ // Testing
+ //----------------------------------------------------------------------------------------------
@VisibleForTesting
boolean isShutdown() {
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index e6beba6..fff75d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -19,13 +19,20 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.util.ExceptionUtils;
+
+import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,34 +47,81 @@ public class JobManagerServices {
public final RestartStrategyFactory restartStrategyFactory;
- public final SavepointStore savepointStore;
-
- public final Time timeout;
-
- public final JobManagerMetricGroup jobManagerMetricGroup;
+ public final Time rpcAskTimeout;
public JobManagerServices(
ExecutorService executorService,
BlobLibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
- SavepointStore savepointStore,
- Time timeout,
- JobManagerMetricGroup jobManagerMetricGroup) {
+ Time rpcAskTimeout) {
this.executorService = checkNotNull(executorService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
- this.savepointStore = checkNotNull(savepointStore);
- this.timeout = checkNotNull(timeout);
- this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+ this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
+ }
+
+ /**
+ *
+ * <p>This method makes sure all services are closed or shut down, even when an exception occurred
+ * in the shutdown of one component. The first encountered exception is thrown, with successive
+ * exceptions added as suppressed exceptions.
+ *
+ * @throws Exception The first Exception encountered during shutdown.
+ */
+ public void shutdown() throws Exception {
+ Throwable firstException = null;
+
+ try {
+ executorService.shutdownNow();
+ } catch (Throwable t) {
+ firstException = t;
+ }
+
+ try {
+ libraryCacheManager.shutdown();
+ }
+ catch (Throwable t) {
+ if (firstException == null) {
+ firstException = t;
+ } else {
+ firstException.addSuppressed(t);
+ }
+ }
+
+ if (firstException != null) {
+ ExceptionUtils.rethrowException(firstException, "Error while shutting down JobManager services");
+ }
}
// ------------------------------------------------------------------------
// Creating the components from a configuration
// ------------------------------------------------------------------------
- public static JobManagerServices fromConfiguration(Configuration config) throws Exception {
- // TODO not yet implemented
- return null;
+
+ public static JobManagerServices fromConfiguration(
+ Configuration config,
+ HighAvailabilityServices haServices) throws Exception {
+
+ final BlobServer blobServer = new BlobServer(config, haServices);
+
+ final long cleanupInterval = config.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+ ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+ final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval);
+
+ final FiniteDuration timeout;
+ try {
+ timeout = AkkaUtils.getTimeout(config);
+ } catch (NumberFormatException e) {
+ throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
+ }
+
+ return new JobManagerServices(
+ new ForkJoinPool(),
+ libraryCacheManager,
+ RestartStrategyFactory.createRestartStrategyFactory(config),
+ Time.of(timeout.length(), timeout.unit()));
}
}