You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:08 UTC
[03/16] flink git commit: [FLINK-6136] Separate EmbeddedHaServices
and StandaloneHaServices
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
deleted file mode 100644
index ab1ce47..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability.nonha;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-
-import java.util.HashMap;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A registry for running jobs, not-highly available.
- */
-public class NonHaRegistry implements RunningJobsRegistry {
-
- /** The currently running jobs */
- private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
-
- @Override
- public void setJobRunning(JobID jobID) {
- checkNotNull(jobID);
-
- synchronized (jobStatus) {
- jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
- }
- }
-
- @Override
- public void setJobFinished(JobID jobID) {
- checkNotNull(jobID);
-
- synchronized (jobStatus) {
- jobStatus.put(jobID, JobSchedulingStatus.DONE);
- }
- }
-
- @Override
- public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
- checkNotNull(jobID);
-
- synchronized (jobStatus) {
- JobSchedulingStatus status = jobStatus.get(jobID);
- return status == null ? JobSchedulingStatus.PENDING : status;
- }
- }
-
- @Override
- public void clearJob(JobID jobID) {
- checkNotNull(jobID);
-
- synchronized (jobStatus) {
- jobStatus.remove(jobID);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
new file mode 100644
index 0000000..a338edc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process.
+ *
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class EmbeddedHaServices extends AbstractNonHaServices {
+
+ private final Executor executor;
+
+ private final EmbeddedLeaderService resourceManagerLeaderService;
+
+ private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
+
+ public EmbeddedHaServices(Executor executor) {
+ this.executor = Preconditions.checkNotNull(executor);
+ this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
+ this.jobManagerLeaderServices = new HashMap<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ return resourceManagerLeaderService.createLeaderRetrievalService();
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
+ return resourceManagerLeaderService.createLeaderElectionService();
+ }
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (lock) {
+ checkNotShutdown();
+ EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+ return service.createLeaderRetrievalService();
+ }
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (lock) {
+ checkNotShutdown();
+ EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+ return service.createLeaderElectionService();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // internal
+ // ------------------------------------------------------------------------
+
+ @GuardedBy("lock")
+ private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
+ EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
+ if (service == null) {
+ service = new EmbeddedLeaderService(executor);
+ jobManagerLeaderServices.put(jobID, service);
+ }
+ return service;
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void close() throws Exception {
+ synchronized (lock) {
+ if (!isShutDown()) {
+ // stop all job manager leader services
+ for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
+ service.shutdown();
+ }
+ jobManagerLeaderServices.clear();
+
+ resourceManagerLeaderService.shutdown();
+ }
+
+ super.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
new file mode 100644
index 0000000..5eb4375
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple leader election service, which selects a leader among contenders and notifies listeners.
+ *
+ * <p>An election service for contenders can be created via {@link #createLeaderElectionService()},
+ * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}.
+ */
+public class EmbeddedLeaderService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
+
+ private final Object lock = new Object();
+
+ private final Executor notificationExecutor;
+
+ private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
+
+ private final Set<EmbeddedLeaderRetrievalService> listeners;
+
+ /** proposed leader, which has been notified of leadership grant, but has not confirmed */
+ private EmbeddedLeaderElectionService currentLeaderProposed;
+
+ /** actual leader that has confirmed leadership and of which listeners have been notified */
+ private EmbeddedLeaderElectionService currentLeaderConfirmed;
+
+ /** fencing UID for the current leader (or proposed leader) */
+ private UUID currentLeaderSessionId;
+
+ /** the cached address of the current leader */
+ private String currentLeaderAddress;
+
+ /** flag marking the service as terminated */
+ private boolean shutdown;
+
+ // ------------------------------------------------------------------------
+
+ public EmbeddedLeaderService(Executor notificationsDispatcher) {
+ this.notificationExecutor = checkNotNull(notificationsDispatcher);
+ this.allLeaderContenders = new HashSet<>();
+ this.listeners = new HashSet<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown and errors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Shuts down this leader election service.
+ *
+ * <p>This method does not perform a clean revocation of the leader status and
+ * no notification to any leader listeners. It simply notifies all contenders
+ * and listeners that the service is no longer available.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ shutdownInternally(new Exception("Leader election service is shutting down"));
+ }
+ }
+
+ private void fatalError(Throwable error) {
+ LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+ synchronized (lock) {
+ shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+ }
+ }
+
+ @GuardedBy("lock")
+ private void shutdownInternally(Exception exceptionForHandlers) {
+ assert Thread.holdsLock(lock);
+
+ if (!shutdown) {
+ // clear all leader status
+ currentLeaderProposed = null;
+ currentLeaderConfirmed = null;
+ currentLeaderSessionId = null;
+ currentLeaderAddress = null;
+
+ // fail all registered listeners
+ for (EmbeddedLeaderElectionService service : allLeaderContenders) {
+ service.shutdown(exceptionForHandlers);
+ }
+ allLeaderContenders.clear();
+
+ // fail all registered listeners
+ for (EmbeddedLeaderRetrievalService service : listeners) {
+ service.shutdown(exceptionForHandlers);
+ }
+ listeners.clear();
+
+ shutdown = true;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // creating contenders and listeners
+ // ------------------------------------------------------------------------
+
+ public LeaderElectionService createLeaderElectionService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderElectionService();
+ }
+
+ public LeaderRetrievalService createLeaderRetrievalService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderRetrievalService();
+ }
+
+ // ------------------------------------------------------------------------
+ // adding and removing contenders & listeners
+ // ------------------------------------------------------------------------
+
+ /**
+ * Callback from leader contenders when they start their service.
+ */
+ void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader election service is already started");
+
+ try {
+ if (!allLeaderContenders.add(service)) {
+ throw new IllegalStateException("leader election service was added to this service multiple times");
+ }
+
+ service.contender = contender;
+ service.running = true;
+
+ updateLeader();
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ /**
+ * Callback from leader contenders when they stop their service.
+ */
+ void removeContender(EmbeddedLeaderElectionService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!allLeaderContenders.remove(service)) {
+ throw new IllegalStateException("leader election service does not belong to this service");
+ }
+
+ // stop the service
+ service.contender = null;
+ service.running = false;
+ service.isLeader = false;
+
+ // if that was the current leader, unset its status
+ if (currentLeaderConfirmed == service) {
+ currentLeaderConfirmed = null;
+ currentLeaderSessionId = null;
+ currentLeaderAddress = null;
+ }
+ if (currentLeaderProposed == service) {
+ currentLeaderProposed = null;
+ currentLeaderSessionId = null;
+ }
+
+ updateLeader();
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ /**
+ * Callback from leader contenders when they confirm a leader grant
+ */
+ void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
+ synchronized (lock) {
+ // if the service was shut down in the meantime, ignore this confirmation
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ // check if the confirmation is for the same grant, or whether it is a stale grant
+ if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
+ final String address = service.contender.getAddress();
+ LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
+
+ // mark leadership
+ currentLeaderConfirmed = service;
+ currentLeaderAddress = address;
+ currentLeaderProposed = null;
+ service.isLeader = true;
+
+ // notify all listeners
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
+ }
+ }
+ else {
+ LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ @GuardedBy("lock")
+ private void updateLeader() {
+ // this must be called under the lock
+ assert Thread.holdsLock(lock);
+
+ if (currentLeaderConfirmed == null && currentLeaderProposed == null) {
+ // we need a new leader
+ if (allLeaderContenders.isEmpty()) {
+ // no new leader available, tell everyone that there is no leader currently
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+ }
+ }
+ else {
+ // propose a leader and ask it
+ final UUID leaderSessionId = UUID.randomUUID();
+ EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
+
+ currentLeaderSessionId = leaderSessionId;
+ currentLeaderProposed = leaderService;
+
+ LOG.info("Proposing leadership to contender {} @ {}",
+ leaderService.contender, leaderService.contender.getAddress());
+
+ notificationExecutor.execute(
+ new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
+ }
+ }
+ }
+
+ void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader retrieval service is already started");
+
+ try {
+ if (!listeners.add(service)) {
+ throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+ }
+
+ service.listener = listener;
+ service.running = true;
+
+ // if we already have a leader, immediately notify this new listener
+ if (currentLeaderConfirmed != null) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG));
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ void removeListener(EmbeddedLeaderRetrievalService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!listeners.remove(service)) {
+ throw new IllegalStateException("leader retrieval service does not belong to this service");
+ }
+
+ // stop the service
+ service.listener = null;
+ service.running = false;
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // election and retrieval service implementations
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderElectionService implements LeaderElectionService {
+
+ volatile LeaderContender contender;
+
+ volatile boolean isLeader;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderContender contender) throws Exception {
+ checkNotNull(contender);
+ addContender(this, contender);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeContender(this);
+ }
+
+ @Override
+ public void confirmLeaderSessionID(UUID leaderSessionID) {
+ checkNotNull(leaderSessionID);
+ confirmLeader(this, leaderSessionID);
+ }
+
+ @Override
+ public boolean hasLeadership() {
+ return isLeader;
+ }
+
+ void shutdown(Exception cause) {
+ if (running) {
+ running = false;
+ isLeader = false;
+ contender.handleError(cause);
+ contender = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+ volatile LeaderRetrievalListener listener;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderRetrievalListener listener) throws Exception {
+ checkNotNull(listener);
+ addListener(this, listener);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeListener(this);
+ }
+
+ public void shutdown(Exception cause) {
+ if (running) {
+ running = false;
+ listener.handleError(cause);
+ listener = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // asynchronous notifications
+ // ------------------------------------------------------------------------
+
+ private static class NotifyOfLeaderCall implements Runnable {
+
+ @Nullable
+ private final String address; // null if leader revoked without new leader
+ @Nullable
+ private final UUID leaderSessionId; // null if leader revoked without new leader
+
+ private final LeaderRetrievalListener listener;
+ private final Logger logger;
+
+ NotifyOfLeaderCall(
+ @Nullable String address,
+ @Nullable UUID leaderSessionId,
+ LeaderRetrievalListener listener,
+ Logger logger) {
+
+ this.address = address;
+ this.leaderSessionId = leaderSessionId;
+ this.listener = checkNotNull(listener);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ listener.notifyLeaderAddress(address, leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error notifying leader listener about new leader", t);
+ listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class GrantLeadershipCall implements Runnable {
+
+ private final LeaderContender contender;
+ private final UUID leaderSessionId;
+ private final Logger logger;
+
+ GrantLeadershipCall(
+ LeaderContender contender,
+ UUID leaderSessionId,
+ Logger logger) {
+
+ this.contender = checkNotNull(contender);
+ this.leaderSessionId = checkNotNull(leaderSessionId);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ contender.grantLeadership(leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error granting leadership to contender", t);
+ contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
new file mode 100644
index 0000000..a56b077
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.leaderelection;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the {@link LeaderElectionService} interface that handles a single
+ * leader contender. When started, this service immediately grants the contender the leadership.
+ *
+ * <p>The implementation accepts a single static leader session ID and is hence compatible with
+ * pre-configured single leader (no leader failover) setups.
+ *
+ * <p>This implementation supports a series of leader listeners that receive notifications about
+ * the leader contender.
+ */
+public class SingleLeaderElectionService implements LeaderElectionService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SingleLeaderElectionService.class);
+
+ // ------------------------------------------------------------------------
+
+ /** lock for all operations on this instance */
+ private final Object lock = new Object();
+
+ /** The executor service that dispatches notifications */
+ private final Executor notificationExecutor;
+
+ /** The leader ID assigned to the immediate leader */
+ private final UUID leaderId;
+
+ @GuardedBy("lock")
+ private final HashSet<EmbeddedLeaderRetrievalService> listeners;
+
+ /** The currently proposed leader */
+ @GuardedBy("lock")
+ private volatile LeaderContender proposedLeader;
+
+ /** The confirmed leader */
+ @GuardedBy("lock")
+ private volatile LeaderContender leader;
+
+ /** The address of the confirmed leader */
+ @GuardedBy("lock")
+ private volatile String leaderAddress;
+
+ /** Flag marking this service as shutdown, meaning it cannot be started again */
+ @GuardedBy("lock")
+ private volatile boolean shutdown;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new leader election service. The service assigns the given leader ID
+ * to the leader contender.
+ *
+ * @param leaderId The constant leader ID assigned to the leader.
+ */
+ public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) {
+ this.notificationExecutor = checkNotNull(notificationsDispatcher);
+ this.leaderId = checkNotNull(leaderId);
+ this.listeners = new HashSet<>();
+
+ shutdown = false;
+ }
+
+ // ------------------------------------------------------------------------
+ // leader election service
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void start(LeaderContender contender) throws Exception {
+ checkNotNull(contender, "contender");
+
+ synchronized (lock) {
+ checkState(!shutdown, "service is shut down");
+ checkState(proposedLeader == null, "service already started");
+
+ // directly grant leadership to the given contender
+ proposedLeader = contender;
+ notificationExecutor.execute(new GrantLeadershipCall(contender, leaderId));
+ }
+ }
+
+ @Override
+ public void stop() {
+ synchronized (lock) {
+ // notify all listeners that there is no leader
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+ }
+
+ // if there was a leader, revoke its leadership
+ if (leader != null) {
+ try {
+ leader.revokeLeadership();
+ } catch (Throwable t) {
+ leader.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+
+ proposedLeader = null;
+ leader = null;
+ leaderAddress = null;
+ }
+ }
+
+ @Override
+ public void confirmLeaderSessionID(UUID leaderSessionID) {
+ checkNotNull(leaderSessionID, "leaderSessionID");
+ checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id");
+
+ synchronized (lock) {
+ checkState(!shutdown, "service is shut down");
+ checkState(proposedLeader != null, "no leader proposed yet");
+ checkState(leader == null, "leader already confirmed");
+
+ // accept the confirmation
+ final String address = proposedLeader.getAddress();
+ leaderAddress = address;
+ leader = proposedLeader;
+
+ // notify all listeners
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG));
+ }
+ }
+ }
+
+ @Override
+ public boolean hasLeadership() {
+ synchronized (lock) {
+ return leader != null;
+ }
+ }
+
+ void errorOnGrantLeadership(LeaderContender contender, Throwable error) {
+ LOG.warn("Error notifying leader listener about new leader", error);
+ contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error));
+
+ synchronized (lock) {
+ if (proposedLeader == contender) {
+ proposedLeader = null;
+ leader = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown
+ // ------------------------------------------------------------------------
+
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ public void shutdown() {
+ shutdownInternally(new Exception("The leader service is shutting down"));
+ }
+
+ private void shutdownInternally(Exception exceptionForHandlers) {
+ synchronized (lock) {
+ if (shutdown) {
+ return;
+ }
+
+ shutdown = true;
+
+ // fail the leader (if there is one)
+ if (leader != null) {
+ try {
+ leader.handleError(exceptionForHandlers);
+ } catch (Throwable ignored) {}
+ }
+
+ // clear all leader status
+ leader = null;
+ proposedLeader = null;
+ leaderAddress = null;
+
+ // fail all registered listeners
+ for (EmbeddedLeaderRetrievalService service : listeners) {
+ service.shutdown(exceptionForHandlers);
+ }
+ listeners.clear();
+ }
+ }
+
+ private void fatalError(Throwable error) {
+ LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+ shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+ }
+
+ // ------------------------------------------------------------------------
+ // leader listeners
+ // ------------------------------------------------------------------------
+
+ public LeaderRetrievalService createLeaderRetrievalService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderRetrievalService();
+ }
+
+ void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader retrieval service is already started");
+
+ try {
+ if (!listeners.add(service)) {
+ throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+ }
+
+ service.listener = listener;
+ service.running = true;
+
+ // if we already have a leader, immediately notify this new listener
+ if (leader != null) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG));
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ void removeListener(EmbeddedLeaderRetrievalService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!listeners.remove(service)) {
+ throw new IllegalStateException("leader retrieval service does not belong to this service");
+ }
+
+ // stop the service
+ service.listener = null;
+ service.running = false;
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+ volatile LeaderRetrievalListener listener;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderRetrievalListener listener) throws Exception {
+ checkNotNull(listener);
+ addListener(this, listener);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeListener(this);
+ }
+
+ void shutdown(Exception cause) {
+ if (running) {
+ final LeaderRetrievalListener lst = listener;
+ running = false;
+ listener = null;
+
+ try {
+ lst.handleError(cause);
+ } catch (Throwable ignored) {}
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // asynchronous notifications
+ // ------------------------------------------------------------------------
+
+ /**
+ * This runnable informs a leader contender that it gained leadership.
+ */
+ private class GrantLeadershipCall implements Runnable {
+
+ private final LeaderContender contender;
+ private final UUID leaderSessionId;
+
+ GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId) {
+
+ this.contender = checkNotNull(contender);
+ this.leaderSessionId = checkNotNull(leaderSessionId);
+ }
+
+ @Override
+ public void run() {
+ try {
+ contender.grantLeadership(leaderSessionId);
+ }
+ catch (Throwable t) {
+ errorOnGrantLeadership(contender, t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This runnable informs a leader listener of a new leader
+ */
+ private static class NotifyOfLeaderCall implements Runnable {
+
+ @Nullable
+ private final String address; // null if leader revoked without new leader
+ @Nullable
+ private final UUID leaderSessionId; // null if leader revoked without new leader
+
+ private final LeaderRetrievalListener listener;
+ private final Logger logger;
+
+ NotifyOfLeaderCall(
+ @Nullable String address,
+ @Nullable UUID leaderSessionId,
+ LeaderRetrievalListener listener,
+ Logger logger) {
+
+ this.address = address;
+ this.leaderSessionId = leaderSessionId;
+ this.listener = checkNotNull(listener);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ listener.notifyLeaderAddress(address, leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error notifying leader listener about new leader", t);
+ listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
new file mode 100644
index 0000000..c7b54c3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case.
+ * This implementation can be used for testing, and for cluster setups that do not
+ * tolerate failures of the master processes (JobManager, ResourceManager).
+ *
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager and JobManager, and stores checkpoints and metadata simply on the
+ * heap or on a local file system and therefore in a storage without guarantees.
+ */
+public class StandaloneHaServices extends AbstractNonHaServices {
+
+ /** The constant name of the ResourceManager RPC endpoint */
+ private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
+ /** The fix address of the ResourceManager */
+ private final String resourceManagerAddress;
+
+ /** The fix address of the JobManager */
+ private final String jobManagerAddress;
+
+ /**
+ * Creates a new services class for the fix pre-defined leaders.
+ *
+ * @param resourceManagerAddress The fix address of the ResourceManager
+ */
+ public StandaloneHaServices(String resourceManagerAddress, String jobManagerAddress) {
+ this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
+ this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ synchronized (lock) {
+ checkNotShutdown();
+
+ return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
+ }
+
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
+ synchronized (lock) {
+ checkNotShutdown();
+
+ return new StandaloneLeaderElectionService();
+ }
+ }
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+ synchronized (lock) {
+ checkNotShutdown();
+
+ return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
+ }
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+ synchronized (lock) {
+ checkNotShutdown();
+
+ return new StandaloneLeaderElectionService();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
new file mode 100644
index 0000000..585ef34
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A registry for running jobs, not-highly available.
+ */
+public class StandaloneRunningJobsRegistry implements RunningJobsRegistry {
+
+ /** The currently running jobs */
+ private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
+
+ @Override
+ public void setJobRunning(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (jobStatus) {
+ jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
+ }
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (jobStatus) {
+ jobStatus.put(jobID, JobSchedulingStatus.DONE);
+ }
+ }
+
+ @Override
+ public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (jobStatus) {
+ JobSchedulingStatus status = jobStatus.get(jobID);
+ return status == null ? JobSchedulingStatus.PENDING : status;
+ }
+ }
+
+ @Override
+ public void clearJob(JobID jobID) {
+ checkNotNull(jobID);
+
+ synchronized (jobStatus) {
+ jobStatus.remove(jobID);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
new file mode 100644
index 0000000..5d895c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
+ * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
+ *
+ * <pre>
+ * /flink
+ * +/cluster_id_1/resource_manager_lock
+ * | |
+ * | +/job-id-1/job_manager_lock
+ * | | /checkpoints/latest
+ * | | /latest-1
+ * | | /latest-2
+ * | |
+ * | +/job-id-2/job_manager_lock
+ * |
+ * +/cluster_id_2/resource_manager_lock
+ * |
+ * +/job-id-1/job_manager_lock
+ * |/checkpoints/latest
+ * | /latest-1
+ * |/persisted_job_graph
+ * </pre>
+ *
+ * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
+ * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
+ * accommodate specific permission.
+ *
+ * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster".
+ * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
+ * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
+ *
+ * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
+ * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
+ *
+ * <p>In the case of a standalone cluster, that cluster-id needs to be configured via
+ * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
+ * cluster and participate in the execution of the same set of jobs.
+ */
+public class ZooKeeperHaServices implements HighAvailabilityServices {
+
+ private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
+
+ private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
+
+ // ------------------------------------------------------------------------
+
+
+ /** The ZooKeeper client to use */
+ private final CuratorFramework client;
+
+ /** The executor to run ZooKeeper callbacks on */
+ private final Executor executor;
+
+ /** The runtime configuration */
+ private final Configuration configuration;
+
+ /** The zookeeper based running jobs registry */
+ private final RunningJobsRegistry runningJobsRegistry;
+
+ public ZooKeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
+ this.client = checkNotNull(client);
+ this.executor = checkNotNull(executor);
+ this.configuration = checkNotNull(configuration);
+ this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
+ }
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+ return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
+ }
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+ return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
+ }
+
+ @Override
+ public RunningJobsRegistry getRunningJobsRegistry() {
+ return runningJobsRegistry;
+ }
+
+ @Override
+ public BlobStore createBlobStore() throws IOException {
+ return createBlobStore(configuration);
+ }
+
+ /**
+ * Creates the BLOB store in which BLOBs are stored in a highly-available
+ * fashion.
+ *
+ * @param configuration configuration to extract the storage path from
+ * @return Blob store
+ * @throws IOException if the blob store could not be created
+ */
+ public static BlobStore createBlobStore(
+ final Configuration configuration) throws IOException {
+ String storagePath = configuration.getValue(
+ HighAvailabilityOptions.HA_STORAGE_PATH);
+ if (isNullOrWhitespaceOnly(storagePath)) {
+ throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+ HighAvailabilityOptions.HA_STORAGE_PATH);
+ }
+
+ final Path path;
+ try {
+ path = new Path(storagePath);
+ } catch (Exception e) {
+ throw new IOException("Invalid path for highly available storage (" +
+ HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+ }
+
+ final FileSystem fileSystem;
+ try {
+ fileSystem = path.getFileSystem();
+ } catch (Exception e) {
+ throw new IOException("Could not create FileSystem for highly available storage (" +
+ HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+ }
+
+ final String clusterId =
+ configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+ storagePath += "/" + clusterId;
+
+ return new FileSystemBlobStore(fileSystem, storagePath);
+ }
+
+ // ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+
+ @Override
+ public void closeAndCleanupAllData() throws Exception {
+ close();
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static String getPathForJobManager(final JobID jobID) {
+ return "/" + jobID + JOB_MANAGER_LEADER_PATH;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
new file mode 100644
index 0000000..8a083d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A zookeeper based registry for running jobs, highly available.
+ */
+public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {
+
+ private static final Charset ENCODING = Charset.forName("utf-8");
+
+ /** The ZooKeeper client to use */
+ private final CuratorFramework client;
+
+ private final String runningJobPath;
+
+ public ZooKeeperRunningJobsRegistry(final CuratorFramework client, final Configuration configuration) {
+ this.client = checkNotNull(client, "client");
+ this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
+ }
+
+ @Override
+ public void setJobRunning(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ try {
+ writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING);
+ }
+ catch (Exception e) {
+ throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
+ }
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ try {
+ writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
+ }
+ catch (Exception e) {
+ throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
+ }
+ }
+
+ @Override
+ public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ try {
+ final String zkPath = createZkPath(jobID);
+ final Stat stat = client.checkExists().forPath(zkPath);
+ if (stat != null) {
+ // found some data, try to parse it
+ final byte[] data = client.getData().forPath(zkPath);
+ if (data != null) {
+ try {
+ final String name = new String(data, ENCODING);
+ return JobSchedulingStatus.valueOf(name);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IOException("Found corrupt data in ZooKeeper: " +
+ Arrays.toString(data) + " is no valid job status");
+ }
+ }
+ }
+
+ // nothing found, yet, must be in status 'PENDING'
+ return JobSchedulingStatus.PENDING;
+ }
+ catch (Exception e) {
+ throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e);
+ }
+ }
+
+ @Override
+ public void clearJob(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ try {
+ final String zkPath = createZkPath(jobID);
+ this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+ this.client.delete().forPath(zkPath);
+ }
+ catch (Exception e) {
+ throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e);
+ }
+ }
+
+ private String createZkPath(JobID jobID) {
+ return runningJobPath + jobID.toString();
+ }
+
+ private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception {
+ final String zkPath = createZkPath(jobID);
+ this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+ this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e60ff77..3a55f2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -88,8 +87,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -127,6 +126,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class JobMaster extends RpcEndpoint<JobMasterGateway> {
+ /** Default names for Flink's distributed components */
+ public static final String JOB_MANAGER_NAME = "jobmanager";
+ public static final String ARCHIVE_NAME = "archive";
+
private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID");
@@ -208,7 +211,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
FatalErrorHandler errorHandler,
ClassLoader userCodeLoader) throws Exception {
- super(rpcService, RpcServiceUtils.createRandomName(JobManager.JOB_MANAGER_NAME()));
+ super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 03fbef5..3e7f2f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -235,7 +235,9 @@ public class MiniCluster {
// create the high-availability services
LOG.info("Starting high-availability services");
- haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+ haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+ configuration,
+ commonRpcService.getExecutor());
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 5d9db19..f9cf01d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -109,7 +109,7 @@ public class QueryableStateClient {
// Create a leader retrieval service
LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils
- .createLeaderRetrievalService(config);
+ .createLeaderRetrievalService(config, true);
// Get the ask timeout
String askTimeoutString = config.getString(
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 2ef8276..bef0aa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -87,6 +87,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
extends RpcEndpoint<ResourceManagerGateway>
implements LeaderContender {
+ public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
/** Unique id of the resource manager */
private final ResourceID resourceId;
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index fa75bbb..2c64f08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration;
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
index 79d1c02..d4bc2d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration;
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
deleted file mode 100644
index 0007318..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.exceptions;
-
-/**
- * Exception which occures when creating a configuration object fails.
- */
-public class ConfigurationException extends Exception {
- private static final long serialVersionUID = 3971647332059381556L;
-
- public ConfigurationException(String message) {
- super(message);
- }
-
- public ConfigurationException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ConfigurationException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index d21c251..a651168 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration;
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
deleted file mode 100644
index e555e7f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.flink.util.Preconditions;
-import org.jboss.netty.channel.ChannelException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * These RPC utilities contain helper methods around RPC use, such as starting an RPC service,
- * or constructing RPC addresses.
- */
-public class RpcServiceUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
-
- private static final AtomicLong nextNameOffset = new AtomicLong(0L);
-
- // ------------------------------------------------------------------------
- // RPC instantiation
- // ------------------------------------------------------------------------
-
- /**
- * Utility method to create RPC service from configuration and hostname, port.
- *
- * @param hostname The hostname/address that describes the TaskManager's data location.
- * @param port If true, the TaskManager will not initiate the TCP network stack.
- * @param configuration The configuration for the TaskManager.
- * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
- * @throws IOException Thrown, if the actor system can not bind to the address
- * @throws Exception Thrown is some other error occurs while creating akka actor system
- */
- public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
- LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port));
-
- final ActorSystem actorSystem;
-
- try {
- Config akkaConfig;
-
- if (hostname != null && !hostname.isEmpty()) {
- // remote akka config
- akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
- } else {
- // local akka config
- akkaConfig = AkkaUtils.getAkkaConfig(configuration);
- }
-
- LOG.debug("Using akka configuration \n {}.", akkaConfig);
-
- actorSystem = AkkaUtils.createActorSystem(akkaConfig);
- } catch (Throwable t) {
- if (t instanceof ChannelException) {
- Throwable cause = t.getCause();
- if (cause != null && t.getCause() instanceof java.net.BindException) {
- String address = NetUtils.hostAndPortToUrlString(hostname, port);
- throw new IOException("Unable to bind AkkaRpcService actor system to address " +
- address + " - " + cause.getMessage(), t);
- }
- }
- throw new Exception("Could not create TaskManager actor system", t);
- }
-
- final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
- return new AkkaRpcService(actorSystem, timeout);
- }
-
- // ------------------------------------------------------------------------
- // RPC endpoint addressing
- // ------------------------------------------------------------------------
-
- /**
- *
- * @param hostname The hostname or address where the target RPC service is listening.
- * @param port The port where the target RPC service is listening.
- * @param endpointName The name of the RPC endpoint.
- * @param config The configuration from which to deduce further settings.
- *
- * @return The RPC URL of the specified RPC endpoint.
- */
- public static String getRpcUrl(String hostname, int port, String endpointName, Configuration config)
- throws UnknownHostException {
-
- checkNotNull(config, "config is null");
-
- final boolean sslEnabled = config.getBoolean(
- ConfigConstants.AKKA_SSL_ENABLED,
- ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
- SSLUtils.getSSLEnabled(config);
-
- return getRpcUrl(hostname, port, endpointName, sslEnabled);
- }
-
- /**
- *
- * @param hostname The hostname or address where the target RPC service is listening.
- * @param port The port where the target RPC service is listening.
- * @param endpointName The name of the RPC endpoint.
- * @param secure True, if security/encryption is enabled, false otherwise.
- *
- * @return The RPC URL of the specified RPC endpoint.
- */
- public static String getRpcUrl(String hostname, int port, String endpointName, boolean secure)
- throws UnknownHostException {
-
- checkNotNull(hostname, "hostname is null");
- checkNotNull(endpointName, "endpointName is null");
- checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
-
- final String protocol = secure ? "akka.ssl.tcp" : "akka.tcp";
- final String hostPort = NetUtils.hostAndPortToUrlString(hostname, port);
-
- return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName);
- }
-
- /**
- * Creates a random name of the form prefix_X, where X is an increasing number.
- *
- * @param prefix Prefix string to prepend to the monotonically increasing name offset number
- * @return A random name of the form prefix_X where X is an increasing number
- */
- public static String createRandomName(String prefix) {
- Preconditions.checkNotNull(prefix, "Prefix must not be null.");
-
- long nameOffset;
-
- // obtain the next name offset by incrementing it atomically
- do {
- nameOffset = nextNameOffset.get();
- } while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
-
- return prefix + '_' + nameOffset;
- }
-
- // ------------------------------------------------------------------------
-
- /** This class is not meant to be instantiated */
- private RpcServiceUtils() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
new file mode 100644
index 0000000..eab4de8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.util.Preconditions;
+import org.jboss.netty.channel.ChannelException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * These RPC utilities contain helper methods around RPC use, such as starting an RPC service,
+ * or constructing RPC addresses.
+ */
+public class AkkaRpcServiceUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
+
+ private static final String AKKA_TCP = "akka.tcp";
+ private static final String AkKA_SSL_TCP = "akka.ssl.tcp";
+
+ private static final AtomicLong nextNameOffset = new AtomicLong(0L);
+
+ // ------------------------------------------------------------------------
+ // RPC instantiation
+ // ------------------------------------------------------------------------
+
+ /**
+ * Utility method to create RPC service from configuration and hostname, port.
+ *
+ * @param hostname The hostname/address that describes the TaskManager's data location.
+ * @param port If true, the TaskManager will not initiate the TCP network stack.
+ * @param configuration The configuration for the TaskManager.
+ * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+ * @throws IOException Thrown, if the actor system can not bind to the address
+ * @throws Exception Thrown is some other error occurs while creating akka actor system
+ */
+ public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
+ LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port));
+
+ final ActorSystem actorSystem;
+
+ try {
+ Config akkaConfig;
+
+ if (hostname != null && !hostname.isEmpty()) {
+ // remote akka config
+ akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+ } else {
+ // local akka config
+ akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+ }
+
+ LOG.debug("Using akka configuration \n {}.", akkaConfig);
+
+ actorSystem = AkkaUtils.createActorSystem(akkaConfig);
+ } catch (Throwable t) {
+ if (t instanceof ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof java.net.BindException) {
+ String address = NetUtils.hostAndPortToUrlString(hostname, port);
+ throw new IOException("Unable to bind AkkaRpcService actor system to address " +
+ address + " - " + cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create TaskManager actor system", t);
+ }
+
+ final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+ return new AkkaRpcService(actorSystem, timeout);
+ }
+
+ // ------------------------------------------------------------------------
+ // RPC endpoint addressing
+ // ------------------------------------------------------------------------
+
+ /**
+ *
+ * @param hostname The hostname or address where the target RPC service is listening.
+ * @param port The port where the target RPC service is listening.
+ * @param endpointName The name of the RPC endpoint.
+ * @param config The configuration from which to deduce further settings.
+ *
+ * @return The RPC URL of the specified RPC endpoint.
+ */
+ public static String getRpcUrl(
+ String hostname,
+ int port,
+ String endpointName,
+ HighAvailabilityServicesUtils.AddressResolution addressResolution,
+ Configuration config) throws UnknownHostException {
+
+ checkNotNull(config, "config is null");
+
+ final boolean sslEnabled = config.getBoolean(
+ ConfigConstants.AKKA_SSL_ENABLED,
+ ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+ SSLUtils.getSSLEnabled(config);
+
+ return getRpcUrl(
+ hostname,
+ port,
+ endpointName,
+ addressResolution,
+ sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
+ }
+
+ /**
+ *
+ * @param hostname The hostname or address where the target RPC service is listening.
+ * @param port The port where the target RPC service is listening.
+ * @param endpointName The name of the RPC endpoint.
+ * @param akkaProtocol True, if security/encryption is enabled, false otherwise.
+ *
+ * @return The RPC URL of the specified RPC endpoint.
+ */
+ public static String getRpcUrl(
+ String hostname,
+ int port,
+ String endpointName,
+ HighAvailabilityServicesUtils.AddressResolution addressResolution,
+ AkkaProtocol akkaProtocol) throws UnknownHostException {
+
+ checkNotNull(hostname, "hostname is null");
+ checkNotNull(endpointName, "endpointName is null");
+ checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
+
+ final String protocolPrefix = akkaProtocol == AkkaProtocol.SSL_TCP ? AkKA_SSL_TCP : AKKA_TCP;
+
+ if (addressResolution == AddressResolution.TRY_ADDRESS_RESOLUTION) {
+ // Fail fast if the hostname cannot be resolved
+ //noinspection ResultOfMethodCallIgnored
+ InetAddress.getByName(hostname);
+ }
+
+ final String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port);
+
+ return String.format("%s://flink@%s/user/%s", protocolPrefix, hostPort, endpointName);
+ }
+
+ public enum AkkaProtocol {
+ TCP,
+ SSL_TCP
+ }
+
+ /**
+ * Creates a random name of the form prefix_X, where X is an increasing number.
+ *
+ * @param prefix Prefix string to prepend to the monotonically increasing name offset number
+ * @return A random name of the form prefix_X where X is an increasing number
+ */
+ public static String createRandomName(String prefix) {
+ Preconditions.checkNotNull(prefix, "Prefix must not be null.");
+
+ long nameOffset;
+
+ // obtain the next name offset by incrementing it atomically
+ do {
+ nameOffset = nextNameOffset.get();
+ } while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
+
+ return prefix + '_' + nameOffset;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class is not meant to be instantiated */
+ private AkkaRpcServiceUtils() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5b8c8ee..d05d900 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -65,7 +65,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -85,7 +85,6 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
@@ -111,6 +110,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+ public static final String TASK_MANAGER_NAME = "taskmanager";
+
/** The connection information of this task manager */
private final TaskManagerLocation taskManagerLocation;
@@ -186,7 +187,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
JobLeaderService jobLeaderService,
FatalErrorHandler fatalErrorHandler) {
- super(rpcService, RpcServiceUtils.createRandomName(TaskManager.TASK_MANAGER_NAME()));
+ super(rpcService, AkkaRpcServiceUtils.createRandomName(TaskExecutor.TASK_MANAGER_NAME));
checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2be8ff1..2ed1578 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -219,6 +219,6 @@ public class TaskManagerRunner implements FatalErrorHandler {
"use 0 to let the system choose port automatically.",
ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
- return RpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
+ return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
}
}