You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:10 UTC
[05/50] [abbrv] flink git commit: [FLINK-4355] [cluster management]
Implement TaskManager side of registration at ResourceManager.
[FLINK-4355] [cluster management] Implement TaskManager side of registration at ResourceManager.
This closes #2353
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24eab760
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24eab760
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24eab760
Branch: refs/heads/flip-6
Commit: 24eab760c6543f2dc8b83ff85dfb94670fa83444
Parents: 59cc176
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 10 20:42:45 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:41 2016 +0200
----------------------------------------------------------------------
.../HighAvailabilityServices.java | 39 +++
.../runtime/highavailability/NonHaServices.java | 59 ++++
.../StandaloneLeaderRetrievalService.java | 72 +++--
.../apache/flink/runtime/rpc/RpcEndpoint.java | 1 -
.../apache/flink/runtime/rpc/RpcService.java | 27 ++
.../flink/runtime/rpc/akka/AkkaRpcService.java | 18 ++
.../runtime/rpc/akka/messages/RunAsync.java | 1 +
.../rpc/registration/RegistrationResponse.java | 84 ++++++
.../rpc/registration/RetryingRegistration.java | 292 +++++++++++++++++++
.../rpc/resourcemanager/ResourceManager.java | 23 ++
.../resourcemanager/ResourceManagerGateway.java | 21 +-
.../runtime/rpc/taskexecutor/SlotReport.java | 38 +++
.../runtime/rpc/taskexecutor/TaskExecutor.java | 169 ++++++++---
.../rpc/taskexecutor/TaskExecutorGateway.java | 29 +-
.../TaskExecutorRegistrationSuccess.java | 75 +++++
...TaskExecutorToResourceManagerConnection.java | 194 ++++++++++++
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 51 +++-
.../rpc/taskexecutor/TaskExecutorTest.java | 87 +-----
18 files changed, 1105 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
new file mode 100644
index 0000000..094d36f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * This class gives access to all services needed for
+ *
+ * <ul>
+ * <li>ResourceManager leader election and leader retrieval</li>
+ * <li>JobManager leader election and leader retrieval</li>
+ * <li>Persistence for checkpoint metadata</li>
+ * <li>Registering the latest completed checkpoint(s)</li>
+ * </ul>
+ */
+public interface HighAvailabilityServices {
+
+ /**
+ * Gets the leader retriever for the cluster's resource manager.
+ */
+ LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
new file mode 100644
index 0000000..b8c2ed8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case.
+ * This implementation can be used for testing, and for cluster setups that do not
+ * tolerate failures of the master processes (JobManager, ResourceManager).
+ *
+ * <p>This implementation has no dependencies on any external services. It returns fix
+ * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore
+ * in volatile memory.
+ */
+public class NonHaServices implements HighAvailabilityServices {
+
+ /** The fix address of the ResourceManager */
+ private final String resourceManagerAddress;
+
+ /**
+ * Creates a new services class for the fix pre-defined leaders.
+ *
+ * @param resourceManagerAddress The fix address of the ResourceManager
+ */
+ public NonHaServices(String resourceManagerAddress) {
+ this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
+ }
+
+ // ------------------------------------------------------------------------
+ // Services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 26a34aa..16b163c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -18,44 +18,74 @@
package org.apache.flink.runtime.leaderretrieval;
-import org.apache.flink.util.Preconditions;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
- * Standalone implementation of the {@link LeaderRetrievalService}. The standalone implementation
- * assumes that there is only a single {@link org.apache.flink.runtime.jobmanager.JobManager} whose
- * address is given to the service when creating it. This address is directly given to the
- * {@link LeaderRetrievalListener} when the service is started.
+ * Standalone implementation of the {@link LeaderRetrievalService}. This implementation
+ * assumes that there is only a single contender for leadership
+ * (e.g., a single JobManager or ResourceManager process) and that this process is
+ * reachable under a constant address.
+ *
+ * <p>As soon as this service is started, it immediately notifies the leader listener
+ * of the leader contender with the pre-configured address.
*/
public class StandaloneLeaderRetrievalService implements LeaderRetrievalService {
- /** Address of the only JobManager */
- private final String jobManagerAddress;
+ private final Object startStopLock = new Object();
+
+ /** The fix address of the leader */
+ private final String leaderAddress;
+
+ /** The fix leader ID (leader lock fencing token) */
+ private final UUID leaderId;
- /** Listener which wants to be notified about the new leader */
- private LeaderRetrievalListener leaderListener;
+ /** Flag whether this service is started */
+ private boolean started;
/**
- * Creates a StandaloneLeaderRetrievalService with the given JobManager address.
+ * Creates a StandaloneLeaderRetrievalService with the given leader address.
+ * The leaderId will be null.
*
- * @param jobManagerAddress The JobManager's address which is returned to the
- * {@link LeaderRetrievalListener}
+ * @param leaderAddress The leader's pre-configured address
*/
- public StandaloneLeaderRetrievalService(String jobManagerAddress) {
- this.jobManagerAddress = jobManagerAddress;
+ public StandaloneLeaderRetrievalService(String leaderAddress) {
+ this.leaderAddress = checkNotNull(leaderAddress);
+ this.leaderId = null;
}
+ /**
+ * Creates a StandaloneLeaderRetrievalService with the given leader address.
+ *
+ * @param leaderAddress The leader's pre-configured address
+ * @param leaderId The constant leaderId.
+ */
+ public StandaloneLeaderRetrievalService(String leaderAddress, UUID leaderId) {
+ this.leaderAddress = checkNotNull(leaderAddress);
+ this.leaderId = checkNotNull(leaderId);
+ }
+
+ // ------------------------------------------------------------------------
+
@Override
public void start(LeaderRetrievalListener listener) {
- Preconditions.checkNotNull(listener, "Listener must not be null.");
- Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can " +
- "only be started once.");
+ checkNotNull(listener, "Listener must not be null.");
- leaderListener = listener;
+ synchronized (startStopLock) {
+ checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
+ started = true;
- // directly notify the listener, because we already know the leading JobManager's address
- leaderListener.notifyLeaderAddress(jobManagerAddress, null);
+ // directly notify the listener, because we already know the leading JobManager's address
+ listener.notifyLeaderAddress(leaderAddress, leaderId);
+ }
}
@Override
- public void stop() {}
+ public void stop() {
+ synchronized (startStopLock) {
+ started = false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 67ac182..a28bc14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -237,7 +237,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* }</pre>
*/
public void validateRunsInMainThread() {
- // because the initialization is lazy, it can be that certain methods are
assert currentMainThread.get() == Thread.currentThread();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index f93be83..fabdb05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,8 +18,11 @@
package org.apache.flink.runtime.rpc;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
/**
* Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}.
* Connecting to a rpc server will return a {@link RpcGateway} which can be used to call remote
@@ -71,4 +74,28 @@ public interface RpcService {
* @return Fully qualified address
*/
<C extends RpcGateway> String getAddress(C selfGateway);
+
+ /**
+ * Gets the execution context, provided by this RPC service. This execution
+ * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)}
+ * methods of Futures.
+ *
+ * <p><b>IMPORTANT:</b> This execution context does not isolate the method invocations against
+ * any concurrent invocations and is therefore not suitable to run completion methods of futures
+ * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+ * {@link RpcEndpoint#getMainThreadExecutionContext() MainThreadExecutionContext} of that
+ * {@code RpcEndpoint}.
+ *
+ * @return The execution context provided by the RPC service
+ */
+ ExecutionContext getExecutionContext();
+
+ /**
+ * Execute the runnable in the execution context of this RPC Service, as returned by
+ * {@link #getExecutionContext()}, after a scheduled delay.
+ *
+ * @param runnable Runnable to be executed
+ * @param delay The delay after which the runnable will be executed
+ */
+ void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 7b33524..b647bbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -38,14 +38,18 @@ import org.apache.flink.runtime.rpc.StartStoppable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
import javax.annotation.concurrent.ThreadSafe;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -199,4 +203,18 @@ public class AkkaRpcService implements RpcService {
throw new IllegalArgumentException("Cannot get address for non " + className + '.');
}
}
+
+ @Override
+ public ExecutionContext getExecutionContext() {
+ return actorSystem.dispatcher();
+ }
+
+ @Override
+ public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
+ checkNotNull(runnable, "runnable");
+ checkNotNull(unit, "unit");
+ checkArgument(delay >= 0, "delay must be zero or larger");
+
+ actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, getExecutionContext());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
index c18906c..ce4f9d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -36,6 +36,7 @@ public final class RunAsync implements Serializable {
private final long delay;
/**
+ * Creates a new {@code RunAsync} message.
*
* @param runnable The Runnable to run.
* @param delay The delay in milliseconds. Zero indicates immediate execution.
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
new file mode 100644
index 0000000..2de560a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses given to registration attempts from {@link RetryingRegistration}.
+ */
+public abstract class RegistrationResponse implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // ----------------------------------------------------------------------------
+
+ /**
+ * Base class for a successful registration. Concrete registration implementations
+ * will typically extend this class to attach more information.
+ */
+ public static class Success extends RegistrationResponse {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String toString() {
+ return "Registration Successful";
+ }
+ }
+
+ // ----------------------------------------------------------------------------
+
+ /**
+ * A rejected (declined) registration.
+ */
+ public static final class Decline extends RegistrationResponse {
+ private static final long serialVersionUID = 1L;
+
+ /** the rejection reason */
+ private final String reason;
+
+ /**
+ * Creates a new rejection message.
+ *
+ * @param reason The reason for the rejection.
+ */
+ public Decline(String reason) {
+ this.reason = reason != null ? reason : "(unknown)";
+ }
+
+ /**
+ * Gets the reason for the rejection.
+ */
+ public String getReason() {
+ return reason;
+ }
+
+ @Override
+ public String toString() {
+ return "Registration Declined (" + reason + ')';
+ }
+ }
+}
+
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
new file mode 100644
index 0000000..4c93684
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * This utility class implements the basis of registering one component at another component,
+ * for example registering the TaskExecutor at the ResourceManager.
+ * This {@code RetryingRegistration} implements both the initial address resolution
+ * and the retries-with-backoff strategy.
+ *
+ * <p>The registration gives access to a future that is completed upon successful registration.
+ * The registration can be canceled, for example when the target where it tries to register
+ * at looses leader status.
+ *
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Success> The type of the successful registration responses.
+ */
+public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
+
+ // ------------------------------------------------------------------------
+ // default configuration values
+ // ------------------------------------------------------------------------
+
+ private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
+
+ private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
+
+ private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
+
+ private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
+
+ // ------------------------------------------------------------------------
+ // Fields
+ // ------------------------------------------------------------------------
+
+ private final Logger log;
+
+ private final RpcService rpcService;
+
+ private final String targetName;
+
+ private final Class<Gateway> targetType;
+
+ private final String targetAddress;
+
+ private final UUID leaderId;
+
+ private final Promise<Tuple2<Gateway, Success>> completionPromise;
+
+ private final long initialRegistrationTimeout;
+
+ private final long maxRegistrationTimeout;
+
+ private final long delayOnError;
+
+ private final long delayOnRefusedRegistration;
+
+ private volatile boolean canceled;
+
+ // ------------------------------------------------------------------------
+
+ public RetryingRegistration(
+ Logger log,
+ RpcService rpcService,
+ String targetName,
+ Class<Gateway> targetType,
+ String targetAddress,
+ UUID leaderId) {
+ this(log, rpcService, targetName, targetType, targetAddress, leaderId,
+ INITIAL_REGISTRATION_TIMEOUT_MILLIS, MAX_REGISTRATION_TIMEOUT_MILLIS,
+ ERROR_REGISTRATION_DELAY_MILLIS, REFUSED_REGISTRATION_DELAY_MILLIS);
+ }
+
+ public RetryingRegistration(
+ Logger log,
+ RpcService rpcService,
+ String targetName,
+ Class<Gateway> targetType,
+ String targetAddress,
+ UUID leaderId,
+ long initialRegistrationTimeout,
+ long maxRegistrationTimeout,
+ long delayOnError,
+ long delayOnRefusedRegistration) {
+
+ checkArgument(initialRegistrationTimeout > 0, "initial registration timeout must be greater than zero");
+ checkArgument(maxRegistrationTimeout > 0, "maximum registration timeout must be greater than zero");
+ checkArgument(delayOnError >= 0, "delay on error must be non-negative");
+ checkArgument(delayOnRefusedRegistration >= 0, "delay on refused registration must be non-negative");
+
+ this.log = checkNotNull(log);
+ this.rpcService = checkNotNull(rpcService);
+ this.targetName = checkNotNull(targetName);
+ this.targetType = checkNotNull(targetType);
+ this.targetAddress = checkNotNull(targetAddress);
+ this.leaderId = checkNotNull(leaderId);
+ this.initialRegistrationTimeout = initialRegistrationTimeout;
+ this.maxRegistrationTimeout = maxRegistrationTimeout;
+ this.delayOnError = delayOnError;
+ this.delayOnRefusedRegistration = delayOnRefusedRegistration;
+
+ this.completionPromise = new DefaultPromise<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // completion and cancellation
+ // ------------------------------------------------------------------------
+
+ public Future<Tuple2<Gateway, Success>> getFuture() {
+ return completionPromise.future();
+ }
+
+ /**
+ * Cancels the registration procedure.
+ */
+ public void cancel() {
+ canceled = true;
+ }
+
+ /**
+ * Checks if the registration was canceled.
+ * @return True if the registration was canceled, false otherwise.
+ */
+ public boolean isCanceled() {
+ return canceled;
+ }
+
+ // ------------------------------------------------------------------------
+ // registration
+ // ------------------------------------------------------------------------
+
+ protected abstract Future<RegistrationResponse> invokeRegistration(
+ Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception;
+
+ /**
+ * This method resolves the target address to a callable gateway and starts the
+ * registration after that.
+ */
+ @SuppressWarnings("unchecked")
+ public void startRegistration() {
+ try {
+ // trigger resolution of the resource manager address to a callable gateway
+ Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
+
+ // upon success, start the registration attempts
+ resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() {
+ @Override
+ public void onSuccess(Gateway result) {
+ log.info("Resolved {} address, beginning registration", targetName);
+ register(result, 1, initialRegistrationTimeout);
+ }
+ }, rpcService.getExecutionContext());
+
+ // upon failure, retry, unless this is cancelled
+ resourceManagerFuture.onFailure(new OnFailure() {
+ @Override
+ public void onFailure(Throwable failure) {
+ if (!isCanceled()) {
+ log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress);
+ startRegistration();
+ }
+ }
+ }, rpcService.getExecutionContext());
+ }
+ catch (Throwable t) {
+ cancel();
+ completionPromise.tryFailure(t);
+ }
+ }
+
+ /**
+ * This method performs a registration attempt and triggers either a success notification or a retry,
+ * depending on the result.
+ */
+ @SuppressWarnings("unchecked")
+ private void register(final Gateway gateway, final int attempt, final long timeoutMillis) {
+ // eager check for canceling to avoid some unnecessary work
+ if (canceled) {
+ return;
+ }
+
+ try {
+ log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
+ Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
+
+ // if the registration was successful, let the TaskExecutor know
+ registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() {
+
+ @Override
+ public void onSuccess(RegistrationResponse result) throws Throwable {
+ if (!isCanceled()) {
+ if (result instanceof RegistrationResponse.Success) {
+ // registration successful!
+ Success success = (Success) result;
+ completionPromise.success(new Tuple2<>(gateway, success));
+ }
+ else {
+ // registration refused or unknown
+ if (result instanceof RegistrationResponse.Decline) {
+ RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
+ log.info("Registration at {} was declined: {}", targetName, decline.getReason());
+ } else {
+ log.error("Received unknown response to registration attempt: " + result);
+ }
+
+ log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration);
+ registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration);
+ }
+ }
+ }
+ }, rpcService.getExecutionContext());
+
+ // upon failure, retry
+ registrationFuture.onFailure(new OnFailure() {
+ @Override
+ public void onFailure(Throwable failure) {
+ if (!isCanceled()) {
+ if (failure instanceof TimeoutException) {
+ // we simply have not received a response in time. maybe the timeout was
+ // very low (initial fast registration attempts), maybe the target endpoint is
+ // currently down.
+ if (log.isDebugEnabled()) {
+ log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
+ targetName, targetAddress, attempt, timeoutMillis);
+ }
+
+ long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout);
+ register(gateway, attempt + 1, newTimeoutMillis);
+ }
+ else {
+ // a serious failure occurred. we still should not give up, but keep trying
+ log.error("Registration at " + targetName + " failed due to an error", failure);
+ log.info("Pausing and re-attempting registration in {} ms", delayOnError);
+
+ registerLater(gateway, 1, initialRegistrationTimeout, delayOnError);
+ }
+ }
+ }
+ }, rpcService.getExecutionContext());
+ }
+ catch (Throwable t) {
+ cancel();
+ completionPromise.tryFailure(t);
+ }
+ }
+
+ private void registerLater(final Gateway gateway, final int attempt, final long timeoutMillis, long delay) {
+ rpcService.scheduleRunnable(new Runnable() {
+ @Override
+ public void run() {
+ register(gateway, attempt, timeoutMillis);
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 729ef0c..6f34465 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -19,19 +19,24 @@
package org.apache.flink.runtime.rpc.resourcemanager;
import akka.dispatch.Mapper;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.util.Preconditions;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
/**
@@ -93,4 +98,22 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
System.out.println("SlotRequest: " + slotRequest);
return new SlotAssignment();
}
+
+
+ /**
+ *
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param taskExecutorAddress The address of the TaskExecutor that registers
+ * @param resourceID The resource ID of the TaskExecutor that registers
+ *
+ * @return The response by the ResourceManager.
+ */
+ @RpcMethod
+ public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
+ UUID resourceManagerLeaderId,
+ String taskExecutorAddress,
+ ResourceID resourceID) {
+
+ return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
index 464a261..afddb01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -18,14 +18,18 @@
package org.apache.flink.runtime.rpc.resourcemanager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+import java.util.UUID;
+
/**
- * {@link ResourceManager} rpc gateway interface.
+ * The {@link ResourceManager}'s RPC gateway interface.
*/
public interface ResourceManagerGateway extends RpcGateway {
@@ -55,4 +59,19 @@ public interface ResourceManagerGateway extends RpcGateway {
* @return Future slot assignment
*/
Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+
+ /**
+ *
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param taskExecutorAddress The address of the TaskExecutor that registers
+ * @param resourceID The resource ID of the TaskExecutor that registers
+ * @param timeout The timeout for the response.
+ *
+ * @return The future to the response by the ResourceManager.
+ */
+ Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
+ UUID resourceManagerLeaderId,
+ String taskExecutorAddress,
+ ResourceID resourceID,
+ @RpcTimeout FiniteDuration timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
new file mode 100644
index 0000000..e42fa4a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import java.io.Serializable;
+
+/**
+ * A report about the current status of all slots of the TaskExecutor, describing
+ * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
+ * have been allocated to.
+ */
+public class SlotReport implements Serializable{
+
+ private static final long serialVersionUID = 1L;
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "SlotReport";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index 3a7dd9f..1a637bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -18,67 +18,152 @@
package org.apache.flink.runtime.rpc.taskexecutor;
-import akka.dispatch.ExecutionContexts$;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.ExecutionContext;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* TaskExecutor implementation. The task executor is responsible for the execution of multiple
* {@link org.apache.flink.runtime.taskmanager.Task}.
- *
- * It offers the following methods as part of its rpc interface to interact with him remotely:
- * <ul>
- * <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given task on the TaskExecutor</li>
- * <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task identified by the {@link ExecutionAttemptID}</li>
- * </ul>
*/
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
- private final ExecutionContext executionContext;
- private final Set<ExecutionAttemptID> tasks = new HashSet<>();
- public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
+ /** The unique resource ID of this TaskExecutor */
+ private final ResourceID resourceID;
+
+ /** The access to the leader election and metadata storage services */
+ private final HighAvailabilityServices haServices;
+
+ // --------- resource manager --------
+
+ private TaskExecutorToResourceManagerConnection resourceManagerConnection;
+
+ // ------------------------------------------------------------------------
+
+ public TaskExecutor(
+ RpcService rpcService,
+ HighAvailabilityServices haServices,
+ ResourceID resourceID) {
+
super(rpcService);
- this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(
- Preconditions.checkNotNull(executorService));
+
+ this.haServices = checkNotNull(haServices);
+ this.resourceID = checkNotNull(resourceID);
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
+ // ------------------------------------------------------------------------
+ // Life cycle
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void start() {
+ // start by connecting to the ResourceManager
+ try {
+ haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
+ } catch (Exception e) {
+ onFatalErrorAsync(e);
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+ // RPC methods - ResourceManager related
+ // ------------------------------------------------------------------------
+
+ @RpcMethod
+ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
+ if (resourceManagerConnection != null) {
+ if (newLeaderAddress != null) {
+ // the resource manager switched to a new leader
+ log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+ resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+ }
+ else {
+ // address null means that the current leader is lost without a new leader being there, yet
+ log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+ resourceManagerConnection.getResourceManagerAddress());
+ }
+
+ // drop the current connection or connection attempt
+ if (resourceManagerConnection != null) {
+ resourceManagerConnection.close();
+ resourceManagerConnection = null;
+ }
+ }
+
+ // establish a connection to the new leader
+ if (newLeaderAddress != null) {
+ log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+ resourceManagerConnection =
+ new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId);
+ resourceManagerConnection.start();
+ }
}
+ // ------------------------------------------------------------------------
+ // Error handling
+ // ------------------------------------------------------------------------
+
/**
- * Execute the given task on the task executor. The task is described by the provided
- * {@link TaskDeploymentDescriptor}.
- *
- * @param taskDeploymentDescriptor Descriptor for the task to be executed
- * @return Acknowledge the start of the task execution
+ * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
+ * This method should be used when asynchronous threads want to notify the
+ * TaskExecutor of a fatal error.
+ *
+ * @param t The exception describing the fatal error
*/
- @RpcMethod
- public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
- tasks.add(taskDeploymentDescriptor.getExecutionId());
- return Acknowledge.get();
+ void onFatalErrorAsync(final Throwable t) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ onFatalError(t);
+ }
+ });
}
/**
- * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
- * the method throws an {@link Exception}.
- *
- * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
- * @return Acknowledge the task canceling
- * @throws Exception if the task with the given execution attempt id could not be found
+ * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
+ * This method must only be called from within the TaskExecutor's main thread.
+ *
+ * @param t The exception describing the fatal error
*/
- @RpcMethod
- public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception {
- if (tasks.contains(executionAttemptId)) {
- return Acknowledge.get();
- } else {
- throw new Exception("Could not find task.");
+ void onFatalError(Throwable t) {
+ // to be determined, probably delegate to a fatal error handler that
+ // would either log (mini cluster) ot kill the process (yarn, mesos, ...)
+ log.error("FATAL ERROR", t);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utility classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * The listener for leader changes of the resource manager
+ */
+ private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ onFatalErrorAsync(exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
index 450423e..b0b21b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
@@ -18,31 +18,18 @@
package org.apache.flink.runtime.rpc.taskexecutor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
-import scala.concurrent.Future;
+
+import java.util.UUID;
/**
- * {@link TaskExecutor} rpc gateway interface
+ * {@link TaskExecutor} RPC gateway interface
*/
public interface TaskExecutorGateway extends RpcGateway {
- /**
- * Execute the given task on the task executor. The task is described by the provided
- * {@link TaskDeploymentDescriptor}.
- *
- * @param taskDeploymentDescriptor Descriptor for the task to be executed
- * @return Future acknowledge of the start of the task execution
- */
- Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor);
- /**
- * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
- * the method throws an {@link Exception}.
- *
- * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
- * @return Future acknowledge of the task canceling
- */
- Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId);
+ // ------------------------------------------------------------------------
+ // ResourceManager handlers
+ // ------------------------------------------------------------------------
+
+ void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
new file mode 100644
index 0000000..641102d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt by a
+ * TaskExecutor.
+ */
+public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.Success implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final InstanceID registrationId;
+
+ private final long heartbeatInterval;
+
+ /**
+ * Create a new {@code TaskExecutorRegistrationSuccess} message.
+ *
+ * @param registrationId The ID that the ResourceManager assigned the registration.
+ * @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor.
+ */
+ public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) {
+ this.registrationId = registrationId;
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ /**
+ * Gets the ID that the ResourceManager assigned the registration.
+ */
+ public InstanceID getRegistrationId() {
+ return registrationId;
+ }
+
+ /**
+ * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor.
+ */
+ public long getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')';
+ }
+
+}
+
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
new file mode 100644
index 0000000..ef75862
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.registration.RetryingRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class TaskExecutorToResourceManagerConnection {
+
+ /** the logger for all log messages of this class */
+ private final Logger log;
+
+ /** the TaskExecutor whose connection to the ResourceManager this represents */
+ private final TaskExecutor taskExecutor;
+
+ private final UUID resourceManagerLeaderId;
+
+ private final String resourceManagerAddress;
+
+ private ResourceManagerRegistration pendingRegistration;
+
+ private ResourceManagerGateway registeredResourceManager;
+
+ private InstanceID registrationId;
+
+ /** flag indicating that the connection is closed */
+ private volatile boolean closed;
+
+
+ public TaskExecutorToResourceManagerConnection(
+ Logger log,
+ TaskExecutor taskExecutor,
+ String resourceManagerAddress,
+ UUID resourceManagerLeaderId) {
+
+ this.log = checkNotNull(log);
+ this.taskExecutor = checkNotNull(taskExecutor);
+ this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
+ this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+ }
+
+ // ------------------------------------------------------------------------
+ // Life cycle
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ public void start() {
+ checkState(!closed, "The connection is already closed");
+ checkState(!isRegistered() && pendingRegistration == null, "The connection is already started");
+
+ ResourceManagerRegistration registration = new ResourceManagerRegistration(
+ log, taskExecutor.getRpcService(),
+ resourceManagerAddress, resourceManagerLeaderId,
+ taskExecutor.getAddress(), taskExecutor.getResourceID());
+
+ Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = registration.getFuture();
+
+ future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
+ @Override
+ public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
+ registeredResourceManager = result.f0;
+ registrationId = result.f1.getRegistrationId();
+ }
+ }, taskExecutor.getMainThreadExecutionContext());
+
+ // this future should only ever fail if there is a bug, not if the registration is declined
+ future.onFailure(new OnFailure() {
+ @Override
+ public void onFailure(Throwable failure) {
+ taskExecutor.onFatalError(failure);
+ }
+ }, taskExecutor.getMainThreadExecutionContext());
+ }
+
+ public void close() {
+ closed = true;
+
+ // make sure we do not keep re-trying forever
+ if (pendingRegistration != null) {
+ pendingRegistration.cancel();
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ public UUID getResourceManagerLeaderId() {
+ return resourceManagerLeaderId;
+ }
+
+ public String getResourceManagerAddress() {
+ return resourceManagerAddress;
+ }
+
+ /**
+ * Gets the ResourceManagerGateway. This returns null until the registration is completed.
+ */
+ public ResourceManagerGateway getResourceManager() {
+ return registeredResourceManager;
+ }
+
+ /**
+ * Gets the ID under which the TaskExecutor is registered at the ResourceManager.
+ * This returns null until the registration is completed.
+ */
+ public InstanceID getRegistrationId() {
+ return registrationId;
+ }
+
+ public boolean isRegistered() {
+ return registeredResourceManager != null;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("Connection to ResourceManager %s (leaderId=%s)",
+ resourceManagerAddress, resourceManagerLeaderId);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ static class ResourceManagerRegistration
+ extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
+
+ private final String taskExecutorAddress;
+
+ private final ResourceID resourceID;
+
+ public ResourceManagerRegistration(
+ Logger log,
+ RpcService rpcService,
+ String targetAddress,
+ UUID leaderId,
+ String taskExecutorAddress,
+ ResourceID resourceID) {
+
+ super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
+ this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
+ this.resourceID = checkNotNull(resourceID);
+ }
+
+ @Override
+ protected Future<RegistrationResponse> invokeRegistration(
+ ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
+
+ FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+ return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index fd55904..7b4ab89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -20,15 +20,17 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
import org.junit.Test;
+
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
@@ -41,6 +43,49 @@ import static org.junit.Assert.assertTrue;
public class AkkaRpcServiceTest extends TestLogger {
+ // ------------------------------------------------------------------------
+ // shared test members
+ // ------------------------------------------------------------------------
+
+ private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+ private static AkkaRpcService akkaRpcService =
+ new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+
+ @AfterClass
+ public static void shutdown() {
+ akkaRpcService.stopService();
+ actorSystem.shutdown();
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testScheduleRunnable() throws Exception {
+ final OneShotLatch latch = new OneShotLatch();
+ final long delay = 100;
+ final long start = System.nanoTime();
+
+ akkaRpcService.scheduleRunnable(new Runnable() {
+ @Override
+ public void run() {
+ latch.trigger();
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+
+ latch.await();
+ final long stop = System.nanoTime();
+
+ assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
+ }
+
+ // ------------------------------------------------------------------------
+ // specific component tests - should be moved to the test classes
+ // for those components
+ // ------------------------------------------------------------------------
+
/**
* Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
* {@link AkkaRpcService}.
http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index c96f4f6..9f9bab3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -18,93 +18,8 @@
package org.apache.flink.runtime.rpc.taskexecutor;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
-import org.apache.flink.runtime.util.DirectExecutorService;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.cglib.proxy.InvocationHandler;
-import org.mockito.cglib.proxy.Proxy;
-import scala.concurrent.Future;
-
-import java.net.URL;
-import java.util.Collections;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class TaskExecutorTest extends TestLogger {
-
- /**
- * Tests that we can deploy and cancel a task on the TaskExecutor without exceptions
- */
- @Test
- public void testTaskExecution() throws Exception {
- RpcService testingRpcService = mock(RpcService.class);
- InvocationHandler invocationHandler = mock(InvocationHandler.class);
- Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
- when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
-
- DirectExecutorService directExecutorService = new DirectExecutorService();
- TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
- taskExecutor.start();
-
- TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- new JobID(),
- "Test job",
- new JobVertexID(),
- new ExecutionAttemptID(),
- new SerializedValue<ExecutionConfig>(null),
- "Test task",
- 0,
- 1,
- 0,
- new Configuration(),
- new Configuration(),
- "Invokable",
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.<InputGateDeploymentDescriptor>emptyList(),
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- 0
- );
-
- Acknowledge ack = taskExecutor.executeTask(tdd);
-
- ack = taskExecutor.cancelTask(tdd.getExecutionId());
- }
-
- /**
- * Tests that cancelling a non-existing task will return an exception
- */
- @Test(expected=Exception.class)
- public void testWrongTaskCancellation() throws Exception {
- RpcService testingRpcService = mock(RpcService.class);
- InvocationHandler invocationHandler = mock(InvocationHandler.class);
- Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
- when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
- DirectExecutorService directExecutorService = null;
- TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
- taskExecutor.start();
-
- taskExecutor.cancelTask(new ExecutionAttemptID());
-
- fail("The cancellation should have thrown an exception.");
- }
+
}