You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:58:56 UTC
[05/47] flink git commit: [FLINK-2354] [runtime] Add job graph and
checkpoint recovery
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
new file mode 100644
index 0000000..660f8bc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ *
+ * <p>Each job graph creates ZNode:
+ * <pre>
+ * +----O /flink/jobgraphs/<job-id> 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/jobgraphs/<job-id> N [persistent]
+ * </pre>
+ *
+ * <p>The root path is watched to detect concurrent modifications in corner situations where
+ * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener}
+ * to react to such situations.
+ */
+public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class);
+
+ /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
+ private final Object cacheLock = new Object();
+
+ /** Client (not a namespace facade) */
+ private final CuratorFramework client;
+
+ /** The set of IDs of all added job graphs. */
+ private final Set<JobID> addedJobGraphs = new HashSet<>();
+
+ /** Completed checkpoints in ZooKeeper */
+ private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
+
+ /**
+ * Cache to monitor all children. This is used to detect races with other instances working
+ * on the same state.
+ */
+ private final PathChildrenCache pathCache;
+
+ /** The external listener to be notified on races. */
+ private SubmittedJobGraphListener jobGraphListener;
+
+ /** Flag indicating whether this instance is running. */
+ private boolean isRunning;
+
+ public ZooKeeperSubmittedJobGraphStore(
+ CuratorFramework client,
+ String currentJobsPath,
+ StateHandleProvider<SubmittedJobGraph> stateHandleProvider) throws Exception {
+
+ checkNotNull(currentJobsPath, "Current jobs path");
+ checkNotNull(stateHandleProvider, "State handle provider");
+
+ // Keep a reference to the original client and not the namespace facade. The namespace
+ // facade cannot be closed.
+ this.client = checkNotNull(client, "Curator client");
+
+ // Ensure that the job graphs path exists
+ client.newNamespaceAwareEnsurePath(currentJobsPath)
+ .ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ client = client.usingNamespace(client.getNamespace() + currentJobsPath);
+
+ this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+ this.pathCache = new PathChildrenCache(client, "/", false);
+ pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
+ }
+
+ @Override
+ public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
+ synchronized (cacheLock) {
+ if (!isRunning) {
+ this.jobGraphListener = jobGraphListener;
+
+ pathCache.start();
+
+ isRunning = true;
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ synchronized (cacheLock) {
+ if (isRunning) {
+ jobGraphListener = null;
+
+ pathCache.close();
+
+ client.close();
+
+ isRunning = false;
+ }
+ }
+ }
+
+ @Override
+ public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
+ synchronized (cacheLock) {
+ verifyIsRunning();
+
+ List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
+
+ while (true) {
+ try {
+ submitted = jobGraphsInZooKeeper.getAll();
+ break;
+ }
+ catch (ConcurrentModificationException e) {
+ LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
+ }
+ }
+
+ if (submitted.size() != 0) {
+ List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
+
+ for (Tuple2<StateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
+ SubmittedJobGraph jobGraph = jobStateHandle
+ .f0.getState(ClassLoader.getSystemClassLoader());
+
+ addedJobGraphs.add(jobGraph.getJobId());
+
+ jobGraphs.add(jobGraph);
+ }
+
+ LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs);
+ return jobGraphs;
+ }
+ else {
+ LOG.info("No job graph to recover.");
+ return Collections.emptyList();
+ }
+ }
+ }
+
+ @Override
+ public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
+ checkNotNull(jobId, "Job ID");
+ String path = getPathForJob(jobId);
+
+ synchronized (cacheLock) {
+ verifyIsRunning();
+
+ try {
+ StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path);
+
+ SubmittedJobGraph jobGraph = jobStateHandle
+ .getState(ClassLoader.getSystemClassLoader());
+
+ addedJobGraphs.add(jobGraph.getJobId());
+
+ LOG.info("Recovered {}.", jobGraph);
+
+ return Option.apply(jobGraph);
+ }
+ catch (KeeperException.NoNodeException ignored) {
+ return Option.empty();
+ }
+ }
+ }
+
+ @Override
+ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+ checkNotNull(jobGraph, "Job graph");
+ String path = getPathForJob(jobGraph.getJobId());
+
+ boolean success = false;
+
+ while (!success) {
+ synchronized (cacheLock) {
+ verifyIsRunning();
+
+ int currentVersion = jobGraphsInZooKeeper.exists(path);
+
+ if (currentVersion == -1) {
+ try {
+ jobGraphsInZooKeeper.add(path, jobGraph);
+
+ addedJobGraphs.add(jobGraph.getJobId());
+
+ LOG.info("Added {} to ZooKeeper.", jobGraph);
+
+ success = true;
+ }
+ catch (KeeperException.NodeExistsException ignored) {
+ }
+ }
+ else if (addedJobGraphs.contains(jobGraph.getJobId())) {
+ try {
+ jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph);
+ LOG.info("Updated {} in ZooKeeper.", jobGraph);
+
+ success = true;
+ }
+ catch (KeeperException.NoNodeException ignored) {
+ }
+ }
+ else {
+ throw new IllegalStateException("Oh, no. Trying to update a graph you didn't " +
+ "#getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void removeJobGraph(JobID jobId) throws Exception {
+ checkNotNull(jobId, "Job ID");
+ String path = getPathForJob(jobId);
+
+ synchronized (cacheLock) {
+ if (addedJobGraphs.contains(jobId)) {
+ jobGraphsInZooKeeper.removeAndDiscardState(path);
+
+ addedJobGraphs.remove(jobId);
+ LOG.info("Removed job graph {} from ZooKeeper.", jobId);
+ }
+ }
+ }
+
+ /**
+ * Monitors ZooKeeper for changes.
+ *
+ * <p>Detects modifications from other job managers in corner situations. The event
+ * notifications fire for changes from this job manager as well.
+ */
+ private final class SubmittedJobGraphsPathCacheListener implements PathChildrenCacheListener {
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+ throws Exception {
+
+ if (LOG.isDebugEnabled()) {
+ if (event.getData() != null) {
+ LOG.debug("Received {} event (path: {})", event.getType(), event.getData().getPath());
+ }
+ else {
+ LOG.debug("Received {} event", event.getType());
+ }
+ }
+
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ synchronized (cacheLock) {
+ try {
+ JobID jobId = fromEvent(event);
+ if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
+ try {
+ // Whoa! This has been added by someone else. Or we were fast
+ // to remove it (false positive).
+ jobGraphListener.onAddedJobGraph(jobId);
+ }
+ catch (Throwable t) {
+ LOG.error("Error in callback", t);
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
+ }
+ }
+
+ break;
+
+ case CHILD_UPDATED:
+ // Nothing to do
+ break;
+
+ case CHILD_REMOVED:
+ synchronized (cacheLock) {
+ try {
+ JobID jobId = fromEvent(event);
+ if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
+ try {
+ // Oh oh. Someone else removed one of our job graphs. Mean!
+ jobGraphListener.onRemovedJobGraph(jobId);
+ }
+ catch (Throwable t) {
+ LOG.error("Error in callback", t);
+ }
+ }
+
+ break;
+ }
+ catch (Exception e) {
+ LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
+ }
+ }
+ break;
+
+ case CONNECTION_SUSPENDED:
+ LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
+ "graphs are not monitored (temporarily).");
+
+ case CONNECTION_LOST:
+ LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
+ "graphs are not monitored (permanently).");
+ break;
+
+ case CONNECTION_RECONNECTED:
+ LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
+ "graphs are monitored again.");
+
+ case INITIALIZED:
+ LOG.info("SubmittedJobGraphsPathCacheListener initialized");
+ break;
+ }
+ }
+
+ /**
+ * Returns a JobID for the event's path.
+ */
+ private JobID fromEvent(PathChildrenCacheEvent event) {
+ return JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath()));
+ }
+ }
+
+ /**
+ * Verifies that the state is running.
+ */
+ private void verifyIsRunning() {
+ checkState(isRunning, "Not running. Forgot to call start()?");
+ }
+
+ /**
+ * Returns the JobID as a String (with leading slash).
+ */
+ public static String getPathForJob(JobID jobId) {
+ checkNotNull(jobId, "Job ID");
+ return String.format("/%s", jobId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index b6223ee..6cba141 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -67,4 +67,5 @@ public interface LeaderElectionService {
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
boolean hasLeadership();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index ae3f0e6..811037c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -43,6 +43,7 @@ import java.util.UUID;
* ZooKeeper as well.
*/
public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener {
+
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
/** Client to the ZooKeeper quorum */
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
new file mode 100644
index 0000000..7aa1ccf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+public enum StateBackend {
+ JOBMANAGER, FILESYSTEM;
+
+ /**
+ * Returns the configured {@link StateBackend}.
+ *
+ * @param config The config to parse
+ * @return Configured state backend or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
+ * configured.
+ */
+ public static StateBackend fromConfig(Configuration config) {
+ return StateBackend.valueOf(config.getString(
+ ConfigConstants.STATE_BACKEND,
+ ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
new file mode 100644
index 0000000..0086ac6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProviderFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import java.io.Serializable;
+
+/**
+ * State handler provider factory.
+ *
+ * <p>This is going to be superseded soon.
+ */
+public class StateHandleProviderFactory {
+
+ /**
+ * Creates a {@link org.apache.flink.runtime.state.FileStateHandle.FileStateHandleProvider} at
+ * the configured recovery path.
+ */
+ public static <T extends Serializable> StateHandleProvider<T> createRecoveryFileStateHandleProvider(
+ Configuration config) {
+
+ StateBackend stateBackend = StateBackend.fromConfig(config);
+
+ if (stateBackend == StateBackend.FILESYSTEM) {
+ String recoveryPath = config.getString(
+ ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+ if (recoveryPath.equals("")) {
+ throw new IllegalConfigurationException("Missing recovery path. Specify via " +
+ "configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
+ }
+ else {
+ return FileStateHandle.createProvider(recoveryPath);
+ }
+ }
+ else {
+ throw new IllegalConfigurationException("Unexpected state backend configuration " +
+ stateBackend);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
deleted file mode 100644
index 5f867a5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderElectionUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
-
-/**
- * Utility class to help working with {@link LeaderElectionService} class.
- */
-public final class LeaderElectionUtils {
-
- /**
- * Creates a {@link LeaderElectionService} based on the provided {@link Configuration} object.
- *
- * @param configuration Configuration object
- * @return {@link LeaderElectionService} which was created based on the provided Configuration
- * @throws Exception
- */
- public static LeaderElectionService createLeaderElectionService(Configuration configuration) throws Exception {
- RecoveryMode recoveryMode = RecoveryMode.valueOf(configuration.getString(
- ConfigConstants.RECOVERY_MODE,
- ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase()
- );
-
- LeaderElectionService leaderElectionService;
-
- switch(recoveryMode) {
- case STANDALONE:
- leaderElectionService = new StandaloneLeaderElectionService();
- break;
- case ZOOKEEPER:
- leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
- break;
- default:
- throw new Exception("Unknown RecoveryMode " + recoveryMode);
- }
-
- return leaderElectionService;
- }
-
- /**
- * Private constructor to prevent instantiation.
- */
- private LeaderElectionUtils() {
- throw new RuntimeException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index d2d3945..79b9b7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -21,19 +21,27 @@ package org.apache.flink.runtime.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.state.StateHandleProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Utility class to work with Apache Zookeeper for Flink runtime.
- */
-public final class ZooKeeperUtils {
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ZooKeeperUtils {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
@@ -47,8 +55,10 @@ public final class ZooKeeperUtils {
public static CuratorFramework startCuratorFramework(Configuration configuration) {
String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
- if(zkQuorum == null || zkQuorum.equals("")) {
- throw new RuntimeException("No valid ZooKeeper quorum has been specified.");
+ if (zkQuorum == null || zkQuorum.equals("")) {
+ throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
+ "You can specify the quorum via the configuration key '" +
+ ConfigConstants.ZOOKEEPER_QUORUM_KEY + "'.");
}
int sessionTimeout = configuration.getInteger(
@@ -59,7 +69,7 @@ public final class ZooKeeperUtils {
ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
- int retryWait = configuration.getInteger (
+ int retryWait = configuration.getInteger(
ConfigConstants.ZOOKEEPER_RETRY_WAIT,
ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
@@ -88,14 +98,10 @@ public final class ZooKeeperUtils {
}
/**
- * Returns whether high availability is enabled (<=> ZooKeeper quorum configured).
+ * Returns whether {@link RecoveryMode#ZOOKEEPER} is configured.
*/
- public static boolean isZooKeeperHighAvailabilityEnabled(Configuration flinkConf) {
- String recoveryMode = flinkConf.getString(
- ConfigConstants.RECOVERY_MODE,
- ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-
- return recoveryMode.equals(RecoveryMode.ZOOKEEPER.name());
+ public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
+ return RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER);
}
/**
@@ -125,7 +131,7 @@ public final class ZooKeeperUtils {
* @throws Exception
*/
public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration) throws Exception{
+ Configuration configuration) throws Exception {
CuratorFramework client = startCuratorFramework(configuration);
String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
@@ -134,7 +140,8 @@ public final class ZooKeeperUtils {
}
/**
- * Creates a {@link ZooKeeperLeaderElectionService} instance.
+ * Creates a {@link ZooKeeperLeaderElectionService} instance and a new {@link
+ * CuratorFramework} client.
*
* @param configuration {@link Configuration} object containing the configuration values
* @return {@link ZooKeeperLeaderElectionService} instance.
@@ -142,8 +149,24 @@ public final class ZooKeeperUtils {
*/
public static ZooKeeperLeaderElectionService createLeaderElectionService(
Configuration configuration) throws Exception {
+
CuratorFramework client = startCuratorFramework(configuration);
+ return createLeaderElectionService(client, configuration);
+ }
+
+ /**
+ * Creates a {@link ZooKeeperLeaderElectionService} instance.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object containing the configuration values
+ * @return {@link ZooKeeperLeaderElectionService} instance.
+ * @throws Exception
+ */
+ public static ZooKeeperLeaderElectionService createLeaderElectionService(
+ CuratorFramework client,
+ Configuration configuration) throws Exception {
+
String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
@@ -153,6 +176,89 @@ public final class ZooKeeperUtils {
}
/**
+ * Creates a {@link ZooKeeperSubmittedJobGraphStore} instance.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object
+ * @return {@link ZooKeeperSubmittedJobGraphStore} instance
+ */
+ public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
+ CuratorFramework client,
+ Configuration configuration) throws Exception {
+
+ checkNotNull(configuration, "Configuration");
+
+ StateHandleProvider<SubmittedJobGraph> stateHandleProvider =
+ StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+
+ // ZooKeeper submitted jobs root dir
+ String zooKeeperSubmittedJobsPath = configuration.getString(
+ ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+
+ return new ZooKeeperSubmittedJobGraphStore(
+ client, zooKeeperSubmittedJobsPath, stateHandleProvider);
+ }
+
+ /**
+ * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object
+ * @param jobId ID of job to create the instance for
+ * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
+ * @param userClassLoader User code class loader
+ * @return {@link ZooKeeperCompletedCheckpointStore} instance
+ */
+ public static CompletedCheckpointStore createCompletedCheckpoints(
+ CuratorFramework client,
+ Configuration configuration,
+ JobID jobId,
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader) throws Exception {
+
+ checkNotNull(configuration, "Configuration");
+
+ StateHandleProvider<CompletedCheckpoint> stateHandleProvider =
+ StateHandleProviderFactory.createRecoveryFileStateHandleProvider(configuration);
+
+ String completedCheckpointsPath = configuration.getString(
+ ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+
+ completedCheckpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+
+ return new ZooKeeperCompletedCheckpointStore(
+ maxNumberOfCheckpointsToRetain,
+ userClassLoader,
+ client,
+ completedCheckpointsPath,
+ stateHandleProvider);
+ }
+
+ /**
+ * Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object
+ * @param jobId ID of job to create the instance for
+ * @return {@link ZooKeeperCheckpointIDCounter} instance
+ */
+ public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
+ CuratorFramework client,
+ Configuration configuration,
+ JobID jobId) throws Exception {
+
+ String checkpointIdCounterPath = configuration.getString(
+ ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+
+ checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
+
+ return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath);
+ }
+
+ /**
* Private constructor to prevent instantiation.
*/
private ZooKeeperUtils() {
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
new file mode 100644
index 0000000..936fe1b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * <p>Added state is persisted via {@link StateHandle}s, which in turn are written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs.
+ *
+ * <p>State modifications require some care, because it is possible that certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * <p>ZooKeeper holds the ground truth about state handles, i.e. the following holds:
+ *
+ * <pre>
+ * State handle in ZooKeeper => State handle exists
+ * </pre>
+ *
+ * But not:
+ *
+ * <pre>
+ * State handle exists => State handle in ZooKeeper
+ * </pre>
+ *
+ * There can be lingering state handles when failures happen during operation. They
+ * need to be cleaned up manually (see <a href="https://issues.apache.org/jira/browse/FLINK-2513">
+ * FLINK-2513</a> about a possible way to overcome this).
+ *
+ * @param <T> Type of state
+ */
+public class ZooKeeperStateHandleStore<T extends Serializable> {
+
+ /** Curator ZooKeeper client */
+ private final CuratorFramework client;
+
+ /** State handle provider */
+ private final StateHandleProvider<T> stateHandleProvider;
+
+ /**
+ * Creates a {@link ZooKeeperStateHandleStore}.
+ *
+ * @param client The Curator ZooKeeper client. <strong>Important:</strong> It is
+ * expected that the client's namespace ensures that the root
+ * path is exclusive for all state handles managed by this
+ * instance, e.g. <code>client.usingNamespace("/stateHandles")</code>
+ * @param stateHandleProvider The state handle provider for the state
+ */
+ public ZooKeeperStateHandleStore(
+ CuratorFramework client,
+ StateHandleProvider<T> stateHandleProvider) {
+
+ this.client = checkNotNull(client, "Curator client");
+ this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider");
+ }
+
+ /**
+ * Creates a state handle and stores it in ZooKeeper with create mode {@link
+ * CreateMode#PERSISTENT}.
+ *
+ * @see #add(String, Serializable, CreateMode)
+ */
+ public StateHandle<T> add(String pathInZooKeeper, T state) throws Exception {
+ return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+ }
+
+ /**
+ * Creates a state handle and stores it in ZooKeeper.
+ *
+ * <p><strong>Important</strong>: This will <em>not</em> store the actual state in
+ * ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection
+ * makes sure that data in ZooKeeper is small.
+ *
+ * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and
+ * start with a '/')
+ * @param state State to be added
+ * @param createMode The create mode for the new path in ZooKeeper
+ * @return Created {@link StateHandle}
+ * @throws Exception If a ZooKeeper or state handle operation fails
+ */
+ public StateHandle<T> add(String pathInZooKeeper, T state, CreateMode createMode) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+ checkNotNull(state, "State");
+
+ // Create the state handle. Nothing persisted yet.
+ StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
+
+ boolean success = false;
+
+ try {
+ // Serialize the state handle. This writes the state to the backend.
+ byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+
+ // Write state handle (not the actual state) to ZooKeeper. This is expected to be
+ // smaller than the state itself. This level of indirection makes sure that data in
+ // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
+ // the state can be larger.
+ client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStateHandle);
+
+ success = true;
+
+ return stateHandle;
+ }
+ finally {
+ if (!success) {
+ // Cleanup the state handle if it was not written to ZooKeeper.
+ if (stateHandle != null) {
+ stateHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ZooKeeper and discards the old state handle.
+ *
+ * @param pathInZooKeeper Destination path in ZooKeeper (expected to exist and start with a '/')
+ * @param expectedVersion Expected version of the node to replace
+ * @param state The new state to replace the old one
+ * @throws Exception If a ZooKeeper or state handle operation fails
+ */
+ public void replace(String pathInZooKeeper, int expectedVersion, T state) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+ checkNotNull(state, "State");
+
+ StateHandle<T> oldStateHandle = get(pathInZooKeeper);
+
+ StateHandle<T> stateHandle = stateHandleProvider.createStateHandle(state);
+
+ boolean success = false;
+
+ try {
+ // Serialize the new state handle. This writes the state to the backend.
+ byte[] serializedStateHandle = InstantiationUtil.serializeObject(stateHandle);
+
+ // Replace state handle in ZooKeeper.
+ client.setData()
+ .withVersion(expectedVersion)
+ .forPath(pathInZooKeeper, serializedStateHandle);
+
+ success = true;
+ }
+ finally {
+ if (success) {
+ oldStateHandle.discardState();
+ }
+ else {
+ stateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the version of the node if it exists or <code>-1</code> if it doesn't.
+ *
+ * @param pathInZooKeeper Path in ZooKeeper to check
+ * @return Version of the ZNode if the path exists, <code>-1</code> otherwise.
+ * @throws Exception If the ZooKeeper operation fails
+ */
+ public int exists(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ Stat stat = client.checkExists().forPath(pathInZooKeeper);
+
+ if (stat != null) {
+ return stat.getVersion();
+ }
+
+ return -1;
+ }
+
+ /**
+ * Gets a state handle from ZooKeeper.
+ *
+ * @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to
+ * exist and start with a '/').
+ * @return The state handle
+ * @throws Exception If a ZooKeeper or state handle operation fails
+ */
+ @SuppressWarnings("unchecked")
+ public StateHandle<T> get(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ byte[] data = client.getData().forPath(pathInZooKeeper);
+
+ return (StateHandle<T>) InstantiationUtil
+ .deserializeObject(data, ClassLoader.getSystemClassLoader());
+ }
+
+ /**
+ * Gets all available state handles from ZooKeeper.
+ *
+ * <p>If there is a concurrent modification, the operation is retried until it succeeds.
+ *
+ * @return All state handles from ZooKeeper.
+ * @throws Exception If a ZooKeeper or state handle operation fails
+ */
+ @SuppressWarnings("unchecked")
+ public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception {
+ final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+
+ boolean success = false;
+
+ retry:
+ while (!success) {
+ // Initial cVersion (number of changes to the children of this node)
+ int initialCVersion = client.checkExists().forPath("/").getCversion();
+
+ List<String> children = client.getChildren().forPath("/");
+
+ for (String path : children) {
+ path = "/" + path;
+
+ try {
+ final StateHandle<T> stateHandle = get(path);
+ stateHandles.add(new Tuple2<>(stateHandle, path));
+ }
+ catch (KeeperException.NoNodeException ignored) {
+ // Concurrent deletion, retry
+ continue retry;
+ }
+ }
+
+ int finalCVersion = client.checkExists().forPath("/").getCversion();
+
+ // Check for concurrent modifications
+ success = initialCVersion == finalCVersion;
+ }
+
+ return stateHandles;
+ }
+
+ /**
+ * Gets all available state handles from ZooKeeper sorted by name (ascending).
+ *
+ * <p>If there is a concurrent modification, the operation is retried until it succeeds.
+ *
+ * @return All state handles in ZooKeeper.
+ * @throws Exception If a ZooKeeper or state handle operation fails
+ */
+ @SuppressWarnings("unchecked")
+ public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws Exception {
+ final List<Tuple2<StateHandle<T>, String>> stateHandles = new ArrayList<>();
+
+ boolean success = false;
+
+ retry:
+ while (!success) {
+ // Initial cVersion (number of changes to the children of this node)
+ int initialCVersion = client.checkExists().forPath("/").getCversion();
+
+ List<String> children = ZKPaths.getSortedChildren(
+ client.getZookeeperClient().getZooKeeper(),
+ ZKPaths.fixForNamespace(client.getNamespace(), "/"));
+
+ for (String path : children) {
+ path = "/" + path;
+
+ try {
+ final StateHandle<T> stateHandle = get(path);
+ stateHandles.add(new Tuple2<>(stateHandle, path));
+ }
+ catch (KeeperException.NoNodeException ignored) {
+ // Concurrent deletion, retry
+ continue retry;
+ }
+ }
+
+ int finalCVersion = client.checkExists().forPath("/").getCversion();
+
+ // Check for concurrent modifications
+ success = initialCVersion == finalCVersion;
+ }
+
+ return stateHandles;
+ }
+
+ /**
+ * Removes a state handle from ZooKeeper.
+ *
+ * <p><stong>Important</stong>: this does not discard the state handle. If you want to
+ * discard the state handle call {@link #removeAndDiscardState(String)}.
+ *
+ * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
+ * @throws Exception If the ZooKeeper operation fails
+ */
+ public void remove(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+ }
+
+ /**
+ * Removes a state handle from ZooKeeper asynchronously.
+ *
+ * <p><stong>Important</stong>: this does not discard the state handle. If you want to
+ * discard the state handle call {@link #removeAndDiscardState(String)}.
+ *
+ * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/')
+ * @param callback The callback after the operation finishes
+ * @throws Exception If the ZooKeeper operation fails
+ */
+ public void remove(String pathInZooKeeper, BackgroundCallback callback) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+ checkNotNull(callback, "Background callback");
+
+ client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath(pathInZooKeeper);
+ }
+
+ /**
+ * Discards a state handle and removes it from ZooKeeper.
+ *
+ * <p>If you only want to remove the state handle in ZooKeeper call {@link #remove(String)}.
+ *
+ * @param pathInZooKeeper Path of state handle to discard (expected to start with a '/')
+ * @throws Exception If the ZooKeeper or state handle operation fails
+ */
+ public void removeAndDiscardState(String pathInZooKeeper) throws Exception {
+ checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
+
+ StateHandle<T> stateHandle = get(pathInZooKeeper);
+
+ // Delete the state handle from ZooKeeper first
+ client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
+
+ // Discard the state handle only after it has been successfully deleted from ZooKeeper.
+ // Otherwise we might enter an illegal state after failures (with a state handle in
+ // ZooKeeper, which has already been discarded).
+ stateHandle.discardState();
+ }
+
+ /**
+ * Discards all available state handles and removes them from ZooKeeper.
+ *
+ * @throws Exception If a ZooKeeper or state handle operation fails
+ */
+ public void removeAndDiscardAllState() throws Exception {
+ final List<Tuple2<StateHandle<T>, String>> allStateHandles = getAll();
+
+ ZKPaths.deleteChildren(
+ client.getZookeeperClient().getZooKeeper(),
+ ZKPaths.fixForNamespace(client.getNamespace(), "/"),
+ false);
+
+ // Discard the state handles only after they have been successfully deleted from ZooKeeper.
+ for (Tuple2<StateHandle<T>, String> stateHandleAndPath : allStateHandles) {
+ stateHandleAndPath.f0.discardState();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 75ad20f..67d7a06 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmanager
import akka.actor.ActorRef
-
+import org.apache.flink.runtime.akka.ListeningBehaviour
/**
* Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor
@@ -27,11 +27,20 @@ import akka.actor.ActorRef
* Additionally, it stores whether the job was started in the detached mode. Detached means that
* the submitting actor does not wait for the job result once the job has terminated.
*
+ * Important: This class is serializable, but needs to be deserialized in the context of an actor
+ * system in order to resolve the client [[ActorRef]]. It is possible to serialize the Akka URL
+ * manually, but it is cumbersome and complicates testing in certain scenarios, where you need to
+ * make sure to resolve the correct [[ActorRef]]s when submitting jobs (RepointableActorRef vs.
+ * RemoteActorRef).
+ *
* @param client Actor which submitted the job
* @param start Starting time
*/
-class JobInfo(val client: ActorRef, val start: Long,
- val sessionTimeout: Long) {
+class JobInfo(
+ val client: ActorRef,
+ val listeningBehaviour: ListeningBehaviour,
+ val start: Long,
+ val sessionTimeout: Long) extends Serializable {
var sessionAlive = sessionTimeout > 0
@@ -49,12 +58,16 @@ class JobInfo(val client: ActorRef, val start: Long,
}
}
+ override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)"
+
def setLastActive() =
lastActive = System.currentTimeMillis()
}
object JobInfo{
- def apply(client: ActorRef, start: Long,
- sessionTimeout: Long) =
- new JobInfo(client, start, sessionTimeout)
+ def apply(
+ client: ActorRef,
+ listeningBehaviour: ListeningBehaviour,
+ start: Long,
+ sessionTimeout: Long) = new JobInfo(client, listeningBehaviour, start, sessionTimeout)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 95637bb..f3e4054 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,31 +19,39 @@
package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
-import java.lang.reflect.{InvocationTargetException, Constructor}
+import java.lang.reflect.{Constructor, InvocationTargetException}
import java.net.InetSocketAddress
import java.util.UUID
import akka.actor.Status.Failure
-import akka.actor.{Props, Terminated, PoisonPill, ActorRef, ActorSystem}
+import akka.actor._
import akka.pattern.ask
-
import grizzled.slf4j.Logger
-
import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
+import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.checkpoint.{CheckpointRecoveryFactory, StandaloneCheckpointRecoveryFactory, ZooKeeperCheckpointRecoveryFactory}
import org.apache.flink.runtime.client._
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
-import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.process.ProcessReaper
@@ -51,25 +59,16 @@ import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessageFilter}
-import org.apache.flink.runtime.LogMessages
-import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
-import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, InstantiationUtil}
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
+import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
/**
@@ -110,17 +109,22 @@ class JobManager(
protected val delayBetweenRetries: Long,
protected val timeout: FiniteDuration,
protected val mode: StreamingMode,
- protected val leaderElectionService: LeaderElectionService)
+ protected val leaderElectionService: LeaderElectionService,
+ protected val submittedJobGraphs : SubmittedJobGraphStore,
+ protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
extends FlinkActor
with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
with LogMessages // mixin order is important, we want first logging
- with LeaderContender {
+ with LeaderContender
+ with SubmittedJobGraphListener {
override val log = Logger(getClass)
/** Either running or not yet archived jobs (session hasn't been ended). */
protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
+ protected val recoveryMode = RecoveryMode.fromConfig(flinkConfiguration)
+
var leaderSessionID: Option[UUID] = None
/**
@@ -138,6 +142,22 @@ class JobManager(
"start.", e)
throw new RuntimeException("Could not start the leader election service.", e)
}
+
+ try {
+ submittedJobGraphs.start(this)
+ } catch {
+ case e: Exception =>
+ log.error("Could not start the submitted job graphs service.", e)
+ throw new RuntimeException("Could not start the submitted job graphs service.", e)
+ }
+
+ try {
+ checkpointRecoveryFactory.start()
+ } catch {
+ case e: Exception =>
+ log.error("Could not start the checkpoint recovery service.", e)
+ throw new RuntimeException("Could not start the checkpoint recovery service.", e)
+ }
}
override def postStop(): Unit = {
@@ -159,6 +179,18 @@ class JobManager(
case e: Exception => log.error("Could not properly shutdown the leader election service.")
}
+ try {
+ submittedJobGraphs.stop()
+ } catch {
+ case e: Exception => log.error("Could not properly stop the submitted job graphs service.")
+ }
+
+ try {
+ checkpointRecoveryFactory.stop()
+ } catch {
+ case e: Exception => log.error("Could not properly stop the checkpoint recovery service.")
+ }
+
if (archive != ActorRef.noSender) {
archive ! decorateMessage(PoisonPill)
}
@@ -191,12 +223,21 @@ class JobManager(
// confirming the leader session ID might be blocking, thus do it in a future
future{
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull)
+
+ // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
+ // managers etc.)
+ if (recoveryMode != RecoveryMode.STANDALONE) {
+ context.system.scheduler.scheduleOnce(new FiniteDuration(delayBetweenRetries,
+ MILLISECONDS), self, decorateMessage(RecoverAllJobs))(context.dispatcher)
+ }
}(context.dispatcher)
case RevokeLeadership =>
log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
- cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
+ future {
+ cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
+ }(context.dispatcher)
// disconnect the registered task managers
instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -269,7 +310,62 @@ class JobManager(
sender ! decorateMessage(instanceManager.getTotalNumberOfSlots)
case SubmitJob(jobGraph, listeningBehaviour) =>
- submitJob(jobGraph, listeningBehaviour)
+ val client = sender()
+
+ val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
+ jobGraph.getSessionTimeout)
+
+ future {
+ submitJob(jobGraph, jobInfo)
+ }(context.dispatcher)
+
+ case RecoverJob(jobId) =>
+ future {
+ // The ActorRef, which is part of the submitted job graph can only be deserialized in the
+ // scope of an actor system.
+ akka.serialization.JavaSerializer.currentSystem.withValue(
+ context.system.asInstanceOf[ExtendedActorSystem]) {
+
+ log.info(s"Attempting to recover job $jobId.")
+
+ val jobGraph = submittedJobGraphs.recoverJobGraph(jobId)
+
+ if (jobGraph.isDefined) {
+ if (!leaderElectionService.hasLeadership()) {
+ // we've lost leadership. mission: abort.
+ log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
+ }
+ else {
+ recoverJobGraph(jobGraph.get)
+ }
+ }
+ else {
+ log.warn(s"Failed to recover job graph ${jobId}.")
+ }
+ }
+ }(context.dispatcher)
+
+ case RecoverAllJobs =>
+ future {
+ // The ActorRef, which is part of the submitted job graph can only be deserialized in the
+ // scope of an actor system.
+ akka.serialization.JavaSerializer.currentSystem.withValue(
+ context.system.asInstanceOf[ExtendedActorSystem]) {
+
+ val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
+
+ if (!leaderElectionService.hasLeadership()) {
+ // we've lost leadership. mission: abort.
+ log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
+ s"jobs.")
+ }
+ else {
+ log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.")
+
+ jobGraphs.foreach(recoverJobGraph(_))
+ }
+ }
+ }(context.dispatcher)
case CancelJob(jobID) =>
log.info(s"Trying to cancel job with ID $jobID.")
@@ -377,10 +473,27 @@ class JobManager(
if (newJobStatus.isTerminalState()) {
jobInfo.end = timeStamp
- // is the client waiting for the job result?
- if (jobInfo.client != ActorRef.noSender) {
- newJobStatus match {
- case JobStatus.FINISHED =>
+ future {
+ // TODO If removing the JobGraph from the SubmittedJobGraphsStore fails, the job will
+ // linger around and potentially be recovered at a later time. There is nothing we
+ // can do about that, but it should be communicated with the Client.
+ if (jobInfo.sessionAlive) {
+ jobInfo.setLastActive()
+ val lastActivity = jobInfo.lastActive
+ context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+ // remove only if no activity occurred in the meantime
+ if (lastActivity == jobInfo.lastActive) {
+ removeJob(jobID)
+ }
+ }
+ } else {
+ removeJob(jobID)
+ }
+
+ // is the client waiting for the job result?
+ if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
+ newJobStatus match {
+ case JobStatus.FINISHED =>
try {
val accumulatorResults = executionGraph.getAccumulatorsSerialized()
val result = new SerializedJobExecutionResult(
@@ -398,47 +511,37 @@ class JobManager(
jobInfo.client ! decorateMessage(JobResultFailure(
new SerializedThrowable(exception)))
}
- case JobStatus.CANCELED =>
- // the error may be packed as a serialized throwable
- val unpackedError = SerializedThrowable.get(
- error, executionGraph.getUserClassLoader())
-
- jobInfo.client ! decorateMessage(JobResultFailure(
- new SerializedThrowable(
- new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
-
- case JobStatus.FAILED =>
- val unpackedError = SerializedThrowable.get(
- error, executionGraph.getUserClassLoader())
-
- jobInfo.client ! decorateMessage(JobResultFailure(
- new SerializedThrowable(
- new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
-
- case x =>
- val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
- jobInfo.client ! decorateMessage(JobResultFailure(
- new SerializedThrowable(exception)))
- throw exception
- }
- }
- if (jobInfo.sessionAlive) {
- jobInfo.setLastActive()
- val lastActivity = jobInfo.lastActive
- context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
- // remove only if no activity occurred in the meantime
- if (lastActivity == jobInfo.lastActive) {
- removeJob(jobID)
+ case JobStatus.CANCELED =>
+ // the error may be packed as a serialized throwable
+ val unpackedError = SerializedThrowable.get(
+ error, executionGraph.getUserClassLoader())
+
+ jobInfo.client ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(
+ new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
+
+ case JobStatus.FAILED =>
+ val unpackedError = SerializedThrowable.get(
+ error, executionGraph.getUserClassLoader())
+
+ jobInfo.client ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(
+ new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
+
+ case x =>
+ val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
+ jobInfo.client ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(exception)))
+ throw exception
}
}
- } else {
- removeJob(jobID)
- }
-
+ }(context.dispatcher)
}
case None =>
- removeJob(jobID)
+ future {
+ removeJob(jobID)
+ }(context.dispatcher)
}
case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -600,11 +703,12 @@ class JobManager(
* graph and the execution vertices are queued for scheduling.
*
* @param jobGraph representing the Flink job
- * @param listeningBehaviour specifies the listening behaviour of the sender.
+ * @param jobInfo the job info
+ * @param isRecovery Flag indicating whether this is a recovery or initial submission
*/
- private def submitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour): Unit = {
+ private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
if (jobGraph == null) {
- sender() ! decorateMessage(JobResultFailure(
+ jobInfo.client ! decorateMessage(JobResultFailure(
new SerializedThrowable(
new JobSubmissionException(null, "JobGraph must not be null.")
)
@@ -615,7 +719,7 @@ class JobManager(
val jobName = jobGraph.getName
var executionGraph: ExecutionGraph = null
- log.info(s"Received job ${jobId} (${jobName}).")
+ log.info(s"Submitting job $jobId ($jobName)" + (if (isRecovery) " (Recovery)" else "") + ".")
try {
// Important: We need to make sure that the library registration is the first action,
@@ -628,7 +732,7 @@ class JobManager(
catch {
case t: Throwable =>
throw new JobSubmissionException(jobId,
- "Cannot set up the user code libraries: " + t.getMessage, t)
+ "Cannot set up the user code libraries: " + t.getMessage, t)
}
val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
@@ -641,18 +745,10 @@ class JobManager(
throw new JobSubmissionException(jobId, "The given job is empty")
}
- val client = if(listeningBehaviour == ListeningBehaviour.DETACHED) {
- // The client does not want to receive the SerializedJobExecutionResult
- ActorRef.noSender
- } else {
- // Send the job execution result back to the sender
- sender
- }
-
// see if there already exists an ExecutionGraph for the corresponding job ID
executionGraph = currentJobs.get(jobGraph.getJobID) match {
- case Some((graph, jobInfo)) =>
- jobInfo.setLastActive()
+ case Some((graph, currentJobInfo)) =>
+ currentJobInfo.setLastActive()
graph
case None =>
val graph = new ExecutionGraph(
@@ -664,11 +760,7 @@ class JobManager(
jobGraph.getUserJarBlobKeys,
jobGraph.getClasspaths,
userCodeLoader)
- val jobInfo = JobInfo(
- client,
- System.currentTimeMillis(),
- jobGraph.getSessionTimeout)
- currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
+
graph
}
@@ -682,7 +774,7 @@ class JobManager(
executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
executionGraph.setScheduleMode(jobGraph.getScheduleMode())
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
-
+
try {
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))
}
@@ -691,7 +783,7 @@ class JobManager(
log.warn("Cannot create JSON plan for job", t)
executionGraph.setJsonPlan("{}")
}
-
+
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
if (log.isDebugEnabled) {
@@ -701,62 +793,67 @@ class JobManager(
val numSlots = scheduler.getTotalNumberOfSlots()
for (vertex <- jobGraph.getVertices.asScala) {
-
val executableClass = vertex.getInvokableClassName
if (executableClass == null || executableClass.length == 0) {
throw new JobSubmissionException(jobId,
s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
}
- if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
- vertex.setParallelism(numSlots)
- }
+ if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+ vertex.setParallelism(numSlots)
+ }
- try {
- vertex.initializeOnMaster(userCodeLoader)
- }
- catch {
+ try {
+ vertex.initializeOnMaster(userCodeLoader)
+ }
+ catch {
case t: Throwable =>
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t)
- }
- }
+ }
+ }
- // topologically sort the job vertices and attach the graph to the existing one
- val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
- if (log.isDebugEnabled) {
- log.debug(s"Adding ${sortedTopology.size()} vertices from " +
- s"job graph ${jobId} (${jobName}).")
- }
- executionGraph.attachJobGraph(sortedTopology)
+ // topologically sort the job vertices and attach the graph to the existing one
+ val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
+ if (log.isDebugEnabled) {
+ log.debug(s"Adding ${sortedTopology.size()} vertices from " +
+ s"job graph ${jobId} (${jobName}).")
+ }
+ executionGraph.attachJobGraph(sortedTopology)
- if (log.isDebugEnabled) {
- log.debug("Successfully created execution graph from job " +
- s"graph ${jobId} (${jobName}).")
- }
+ if (log.isDebugEnabled) {
+ log.debug("Successfully created execution graph from job " +
+ s"graph ${jobId} (${jobName}).")
+ }
- // configure the state checkpointing
- val snapshotSettings = jobGraph.getSnapshotSettings
- if (snapshotSettings != null) {
+ // configure the state checkpointing
+ val snapshotSettings = jobGraph.getSnapshotSettings
+ if (snapshotSettings != null) {
+ val jobId = jobGraph.getJobID()
- val idToVertex: JobVertexID => ExecutionJobVertex = id => {
- val vertex = executionGraph.getJobVertex(id)
- if (vertex == null) {
- throw new JobSubmissionException(jobId,
- "The snapshot checkpointing settings refer to non-existent vertex " + id)
- }
- vertex
- }
+ val idToVertex: JobVertexID => ExecutionJobVertex = id => {
+ val vertex = executionGraph.getJobVertex(id)
+ if (vertex == null) {
+ throw new JobSubmissionException(jobId,
+ "The snapshot checkpointing settings refer to non-existent vertex " + id)
+ }
+ vertex
+ }
- val triggerVertices: java.util.List[ExecutionJobVertex] =
+ val triggerVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
- val ackVertices: java.util.List[ExecutionJobVertex] =
+ val ackVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
- val confirmVertices: java.util.List[ExecutionJobVertex] =
+ val confirmVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
+ val completedCheckpoints = checkpointRecoveryFactory
+ .createCompletedCheckpoints(jobId, userCodeLoader)
+
+ val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
+
executionGraph.enableSnapshotCheckpointing(
snapshotSettings.getCheckpointInterval,
snapshotSettings.getCheckpointTimeout,
@@ -764,23 +861,39 @@ class JobManager(
ackVertices,
confirmVertices,
context.system,
- leaderSessionID.orNull)
+ leaderSessionID.orNull,
+ checkpointIdCounter,
+ completedCheckpoints,
+ recoveryMode)
}
// get notified about job status changes
executionGraph.registerJobStatusListener(
new AkkaActorGateway(self, leaderSessionID.orNull))
- if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
+ if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
// the sender wants to be notified about state changes
- val gateway = new AkkaActorGateway(sender(), leaderSessionID.orNull)
+ val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)
executionGraph.registerExecutionListener(gateway)
executionGraph.registerJobStatusListener(gateway)
}
+ if (isRecovery) {
+ executionGraph.restoreLatestCheckpointedState()
+ }
+ else {
+ submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
+ }
+
+ // Add the job graph only after everything is finished. Otherwise there can be races in
+ // tests, which check the currentJobs (for example before killing a JM).
+ if (!currentJobs.contains(jobId)) {
+ currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
+ }
+
// done with submitting the job
- sender() ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
+ jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
}
catch {
case t: Throwable =>
@@ -799,33 +912,61 @@ class JobManager(
new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t)
}
- sender() ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
+ jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
return
}
- // NOTE: Scheduling the job for execution is a separate action from the job submission.
- // The success of submitting the job must be independent from the success of scheduling
- // the job.
- try {
- log.info(s"Scheduling job ${executionGraph.getJobName}.")
- executionGraph.scheduleForExecution(scheduler)
- }
- catch {
- case t: Throwable => try {
- executionGraph.fail(t)
+ if (leaderElectionService.hasLeadership) {
+ // There is a small chance that multiple job managers schedule the same job after if they
+ // try to recover at the same time. This will eventually be noticed, but can not be ruled
+ // out from the beginning.
+
+ // NOTE: Scheduling the job for execution is a separate action from the job submission.
+ // The success of submitting the job must be independent from the success of scheduling
+ // the job.
+ try {
+ log.info(s"Scheduling job $jobId ($jobName).")
+
+ executionGraph.scheduleForExecution(scheduler)
}
catch {
- case tt: Throwable => {
- log.error("Error while marking ExecutionGraph as failed.", tt)
+ case t: Throwable => try {
+ executionGraph.fail(t)
+ }
+ catch {
+ case tt: Throwable => {
+ log.error("Error while marking ExecutionGraph as failed.", tt)
+ }
}
}
}
+ else {
+ // Remove the job graph. Otherwise it will be lingering around and possibly removed from
+ // ZooKeeper by this JM.
+ currentJobs.remove(jobId)
+
+ log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
+ "this. I am not scheduling the job for execution.")
+ }
+ }
+ }
+
+ /**
+ * Submits the job if it is not already one of our current jobs.
+ *
+ * @param jobGraph Job to recover
+ */
+ private def recoverJobGraph(jobGraph: SubmittedJobGraph): Unit = {
+ if (!currentJobs.contains(jobGraph.getJobId)) {
+ future {
+ submitJob(jobGraph.getJobGraph(), jobGraph.getJobInfo(), isRecovery = true)
+ }(context.dispatcher)
}
}
/**
* Dedicated handler for checkpoint messages.
- *
+ *
* @param actorMessage The checkpoint actor message.
*/
private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = {
@@ -836,13 +977,15 @@ class JobManager(
case Some((graph, _)) =>
val coordinator = graph.getCheckpointCoordinator()
if (coordinator != null) {
- try {
- coordinator.receiveAcknowledgeMessage(ackMessage)
- }
- catch {
- case t: Throwable =>
- log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)
- }
+ future {
+ try {
+ coordinator.receiveAcknowledgeMessage(ackMessage)
+ }
+ catch {
+ case t: Throwable =>
+ log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)
+ }
+ }(context.dispatcher)
}
else {
log.error(
@@ -1020,30 +1163,46 @@ class JobManager(
}
/**
- * Removes the job and sends it to the MemoryArchivist
+ * Removes the job and sends it to the MemoryArchivist.
+ *
+ * This should be called asynchronously. Removing the job from the [[SubmittedJobGraphStore]]
+ * might block. Therefore be careful not to block the actor thread.
+ *
* @param jobID ID of the job to remove and archive
*/
private def removeJob(jobID: JobID): Unit = {
currentJobs.synchronized {
- currentJobs.remove(jobID) match {
+ // Don't remove the job yet...
+ currentJobs.get(jobID) match {
case Some((eg, _)) =>
try {
+ // ...otherwise, we can have lingering resources when there is a concurrent shutdown
+ // and the ZooKeeper client is closed. Not removing the job immediately allow the
+ // shutdown to release all resources.
+ submittedJobGraphs.removeJobGraph(jobID)
+ } catch {
+ case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
+ }
+
+ try {
eg.prepareForArchiving()
+
archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
} catch {
case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
"archiving.", t)
}
+ currentJobs.remove(jobID)
case None =>
}
+ }
- try {
- libraryCacheManager.unregisterJob(jobID)
- } catch {
- case t: Throwable =>
- log.error(s"Could not properly unregister job $jobID form the library cache.", t)
- }
+ try {
+ libraryCacheManager.unregisterJob(jobID)
+ } catch {
+ case t: Throwable =>
+ log.error(s"Could not properly unregister job $jobID form the library cache.", t)
}
}
@@ -1053,17 +1212,21 @@ class JobManager(
* @param cause Cause for the cancelling.
*/
private def cancelAndClearEverything(cause: Throwable) {
- for((jobID, (eg, jobInfo)) <- currentJobs) {
+ for ((jobID, (eg, jobInfo)) <- currentJobs) {
+ try {
+ submittedJobGraphs.removeJobGraph(jobID)
+ }
+ catch {
+ case t: Throwable => {
+ log.error("Error during submitted job graph clean up.", t)
+ }
+ }
+
eg.fail(cause)
- if(jobInfo.client != ActorRef.noSender) {
+ if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
jobInfo.client ! decorateMessage(
- Failure(
- new JobExecutionException(
- jobID,
- "All jobs are cancelled and cleared.",
- cause)
- ))
+ Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
}
}
@@ -1079,6 +1242,25 @@ class JobManager(
self ! decorateMessage(RevokeLeadership)
}
+ override def onAddedJobGraph(jobId: JobID): Unit = {
+ if (leaderSessionID.isDefined && !currentJobs.contains(jobId)) {
+ self ! decorateMessage(RecoverJob(jobId))
+ }
+ }
+
+ override def onRemovedJobGraph(jobId: JobID): Unit = {
+ if (leaderSessionID.isDefined) {
+ currentJobs.get(jobId).foreach(
+ job =>
+ future {
+ // Fail the execution graph
+ job._1.fail(new IllegalStateException("Another JobManager removed the job from " +
+ "ZooKeeper."))
+ }(context.dispatcher)
+ )
+ }
+ }
+
override def getAddress: String = {
AkkaUtils.getAkkaURL(context.system, self)
}
@@ -1166,7 +1348,7 @@ object JobManager {
System.exit(STARTUP_FAILURE_RETURN_CODE)
}
- if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+ if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
// address and will not be reachable from anyone remote
if (listeningPort != 0) {
val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
@@ -1227,7 +1409,7 @@ object JobManager {
*
* @param configuration The configuration object for the JobManager.
* @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
- * additional TaskManager in the same process.
+ * an additional TaskManager in the same process.
* @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
* @param listeningAddress The hostname where the JobManager should listen for messages.
* @param listeningPort The port where the JobManager should listen for messages.
@@ -1480,7 +1662,7 @@ object JobManager {
// high availability mode
val port: Int =
- if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+ if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
LOG.info("Starting JobManager in High-Availability Mode")
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
@@ -1524,7 +1706,9 @@ object JobManager {
Long, // delay between retries
FiniteDuration, // timeout
Int, // number of archived jobs
- LeaderElectionService) = {
+ LeaderElectionService,
+ SubmittedJobGraphStore,
+ CheckpointRecoveryFactory) = {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -1588,10 +1772,31 @@ object JobManager {
}
}
- val leaderElectionService = leaderElectionServiceOption match {
- case Some(les) => les
- case None => LeaderElectionUtils.createLeaderElectionService(configuration)
- }
+ // Create recovery related components
+ val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) =
+ RecoveryMode.fromConfig(configuration) match {
+ case RecoveryMode.STANDALONE =>
+ val leaderElectionService = leaderElectionServiceOption match {
+ case Some(les) => les
+ case None => new StandaloneLeaderElectionService()
+ }
+
+ (leaderElectionService,
+ new StandaloneSubmittedJobGraphStore(),
+ new StandaloneCheckpointRecoveryFactory())
+
+ case RecoveryMode.ZOOKEEPER =>
+ val client = ZooKeeperUtils.startCuratorFramework(configuration)
+
+ val leaderElectionService = leaderElectionServiceOption match {
+ case Some(les) => les
+ case None => ZooKeeperUtils.createLeaderElectionService(client, configuration)
+ }
+
+ (leaderElectionService,
+ ZooKeeperUtils.createSubmittedJobGraphs(client, configuration),
+ new ZooKeeperCheckpointRecoveryFactory(client, configuration))
+ }
(executionContext,
instanceManager,
@@ -1599,9 +1804,11 @@ object JobManager {
libraryCacheManager,
executionRetries,
delayBetweenRetries,
- timeout,
- archiveCount,
- leaderElectionService)
+ timeout,
+ archiveCount,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory)
}
/**
@@ -1633,6 +1840,7 @@ object JobManager {
jobManagerClass,
archiveClass)
}
+
/**
* Starts the JobManager and job archiver based on the given configuration, in the
* given actor system.
@@ -1646,28 +1854,30 @@ object JobManager {
* @param streamingMode The mode to run the system in (streaming vs. batch-only)
* @param jobManagerClass The class of the JobManager to be started
* @param archiveClass The class of the MemoryArchivist to be started
- *
+ *
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(
- configuration: Configuration,
- actorSystem: ActorSystem,
- jobMangerActorName: Option[String],
- archiveActorName: Option[String],
- streamingMode: StreamingMode,
- jobManagerClass: Class[_ <: JobManager],
- archiveClass: Class[_ <: MemoryArchivist])
- : (ActorRef, ActorRef) = {
+ configuration: Configuration,
+ actorSystem: ActorSystem,
+ jobMangerActorName: Option[String],
+ archiveActorName: Option[String],
+ streamingMode: StreamingMode,
+ jobManagerClass: Class[_ <: JobManager],
+ archiveClass: Class[_ <: MemoryArchivist])
+ : (ActorRef, ActorRef) = {
val (executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- executionRetries,
- delayBetweenRetries,
- timeout,
- archiveCount,
- leaderElectionService) = createJobManagerComponents(
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ executionRetries,
+ delayBetweenRetries,
+ timeout,
+ archiveCount,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory) = createJobManagerComponents(
configuration,
None)
@@ -1691,7 +1901,9 @@ object JobManager {
delayBetweenRetries,
timeout,
streamingMode,
- leaderElectionService)
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory)
val jobManager: ActorRef = jobMangerActorName match {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index c29df88..d776622 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -66,6 +66,18 @@ object JobManagerMessages {
extends RequiresLeaderSessionID
/**
+ * Triggers the recovery of the job with the given ID.
+ *
+ * @param jobId ID of the job to recover
+ */
+ case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
+
+ /**
+ * Triggers recovery of all available jobs.
+ */
+ case class RecoverAllJobs() extends RequiresLeaderSessionID
+
+ /**
* Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
* sent back to the sender as a [[CancellationResponse]] message.
*
@@ -354,6 +366,10 @@ object JobManagerMessages {
// --------------------------------------------------------------------------
// Utility methods to allow simpler case object access from Java
// --------------------------------------------------------------------------
+
+ def getRequestJobStatus(jobId : JobID) : AnyRef = {
+ RequestJobStatus(jobId)
+ }
def getRequestNumberRegisteredTaskManager : AnyRef = {
RequestNumberRegisteredTaskManager
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 29add0e..2df3437 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -94,9 +94,7 @@ abstract class FlinkMiniCluster(
implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
- val recoveryMode = RecoveryMode.valueOf(configuration.getString(
- ConfigConstants.RECOVERY_MODE,
- ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase)
+ val recoveryMode = RecoveryMode.fromConfig(configuration)
val numJobManagers = getNumberOfJobManagers