You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:10 UTC

[05/50] [abbrv] flink git commit: [FLINK-4355] [cluster management] Implement TaskManager side of registration at ResourceManager.

[FLINK-4355] [cluster management] Implement TaskManager side of registration at ResourceManager.

This closes #2353


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24eab760
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24eab760
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24eab760

Branch: refs/heads/flip-6
Commit: 24eab760c6543f2dc8b83ff85dfb94670fa83444
Parents: 59cc176
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 10 20:42:45 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:41 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |  39 +++
 .../runtime/highavailability/NonHaServices.java |  59 ++++
 .../StandaloneLeaderRetrievalService.java       |  72 +++--
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   1 -
 .../apache/flink/runtime/rpc/RpcService.java    |  27 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  18 ++
 .../runtime/rpc/akka/messages/RunAsync.java     |   1 +
 .../rpc/registration/RegistrationResponse.java  |  84 ++++++
 .../rpc/registration/RetryingRegistration.java  | 292 +++++++++++++++++++
 .../rpc/resourcemanager/ResourceManager.java    |  23 ++
 .../resourcemanager/ResourceManagerGateway.java |  21 +-
 .../runtime/rpc/taskexecutor/SlotReport.java    |  38 +++
 .../runtime/rpc/taskexecutor/TaskExecutor.java  | 169 ++++++++---
 .../rpc/taskexecutor/TaskExecutorGateway.java   |  29 +-
 .../TaskExecutorRegistrationSuccess.java        |  75 +++++
 ...TaskExecutorToResourceManagerConnection.java | 194 ++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  51 +++-
 .../rpc/taskexecutor/TaskExecutorTest.java      |  87 +-----
 18 files changed, 1105 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
new file mode 100644
index 0000000..094d36f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * This class gives access to all services needed for
+ *
+ * <ul>
+ *     <li>ResourceManager leader election and leader retrieval</li>
+ *     <li>JobManager leader election and leader retrieval</li>
+ *     <li>Persistence for checkpoint metadata</li>
+ *     <li>Registering the latest completed checkpoint(s)</li>
+ * </ul>
+ */
+public interface HighAvailabilityServices {
+
+	/**
+	 * Gets the leader retriever for the cluster's resource manager.
+	 */
+	LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
new file mode 100644
index 0000000..b8c2ed8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case.
+ * This implementation can be used for testing, and for cluster setups that do not
+ * tolerate failures of the master processes (JobManager, ResourceManager).
+ * 
+ * <p>This implementation has no dependencies on any external services. It returns fix
+ * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore
+ * in volatile memory.
+ */
+public class NonHaServices implements HighAvailabilityServices {
+
+	/** The fix address of the ResourceManager */
+	private final String resourceManagerAddress;
+
+	/**
+	 * Creates a new services class for the fix pre-defined leaders.
+	 * 
+	 * @param resourceManagerAddress    The fix address of the ResourceManager
+	 */
+	public NonHaServices(String resourceManagerAddress) {
+		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+		return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 26a34aa..16b163c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -18,44 +18,74 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
-import org.apache.flink.util.Preconditions;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Standalone implementation of the {@link LeaderRetrievalService}. The standalone implementation
- * assumes that there is only a single {@link org.apache.flink.runtime.jobmanager.JobManager} whose
- * address is given to the service when creating it. This address is directly given to the
- * {@link LeaderRetrievalListener} when the service is started.
+ * Standalone implementation of the {@link LeaderRetrievalService}. This implementation
+ * assumes that there is only a single contender for leadership
+ * (e.g., a single JobManager or ResourceManager process) and that this process is
+ * reachable under a constant address.
+ * 
+ * <p>As soon as this service is started, it immediately notifies the leader listener
+ * of the leader contender with the pre-configured address.
  */
 public class StandaloneLeaderRetrievalService implements LeaderRetrievalService {
 
-	/** Address of the only JobManager */
-	private final String jobManagerAddress;
+	private final Object startStopLock = new Object();
+	
+	/** The fix address of the leader */
+	private final String leaderAddress;
+
+	/** The fix leader ID (leader lock fencing token) */
+	private final UUID leaderId;
 
-	/** Listener which wants to be notified about the new leader */
-	private LeaderRetrievalListener leaderListener;
+	/** Flag whether this service is started */
+	private boolean started;
 
 	/**
-	 * Creates a StandaloneLeaderRetrievalService with the given JobManager address.
+	 * Creates a StandaloneLeaderRetrievalService with the given leader address.
+	 * The leaderId will be null.
 	 *
-	 * @param jobManagerAddress The JobManager's address which is returned to the
-	 * 							{@link LeaderRetrievalListener}
+	 * @param leaderAddress The leader's pre-configured address
 	 */
-	public StandaloneLeaderRetrievalService(String jobManagerAddress) {
-		this.jobManagerAddress = jobManagerAddress;
+	public StandaloneLeaderRetrievalService(String leaderAddress) {
+		this.leaderAddress = checkNotNull(leaderAddress);
+		this.leaderId = null;
 	}
 
+	/**
+	 * Creates a StandaloneLeaderRetrievalService with the given leader address.
+	 *
+	 * @param leaderAddress The leader's pre-configured address
+	 * @param leaderId      The constant leaderId.
+	 */
+	public StandaloneLeaderRetrievalService(String leaderAddress, UUID leaderId) {
+		this.leaderAddress = checkNotNull(leaderAddress);
+		this.leaderId = checkNotNull(leaderId);
+	}
+
+	// ------------------------------------------------------------------------
+
 	@Override
 	public void start(LeaderRetrievalListener listener) {
-		Preconditions.checkNotNull(listener, "Listener must not be null.");
-		Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can " +
-				"only be started once.");
+		checkNotNull(listener, "Listener must not be null.");
 
-		leaderListener = listener;
+		synchronized (startStopLock) {
+			checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
+			started = true;
 
-		// directly notify the listener, because we already know the leading JobManager's address
-		leaderListener.notifyLeaderAddress(jobManagerAddress, null);
+			// directly notify the listener, because we already know the leading JobManager's address
+			listener.notifyLeaderAddress(leaderAddress, leaderId);
+		}
 	}
 
 	@Override
-	public void stop() {}
+	public void stop() {
+		synchronized (startStopLock) {
+			started = false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 67ac182..a28bc14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -237,7 +237,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * }</pre>
 	 */
 	public void validateRunsInMainThread() {
-		// because the initialization is lazy, it can be that certain methods are
 		assert currentMainThread.get() == Thread.currentThread();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index f93be83..fabdb05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.rpc;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}.
  * Connecting to a rpc server will return a {@link RpcGateway} which can be used to call remote
@@ -71,4 +74,28 @@ public interface RpcService {
 	 * @return Fully qualified address
 	 */
 	<C extends RpcGateway> String getAddress(C selfGateway);
+
+	/**
+	 * Gets the execution context, provided by this RPC service. This execution
+	 * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)}
+	 * methods of Futures.
+	 * 
+	 * <p><b>IMPORTANT:</b> This execution context does not isolate the method invocations against
+	 * any concurrent invocations and is therefore not suitable to run completion methods of futures
+	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+	 * {@link RpcEndpoint#getMainThreadExecutionContext() MainThreadExecutionContext} of that
+	 * {@code RpcEndpoint}.
+	 * 
+	 * @return The execution context provided by the RPC service
+	 */
+	ExecutionContext getExecutionContext();
+
+	/**
+	 * Execute the runnable in the execution context of this RPC Service, as returned by
+	 * {@link #getExecutionContext()}, after a scheduled delay.
+	 *
+	 * @param runnable Runnable to be executed
+	 * @param delay    The delay after which the runnable will be executed
+	 */
+	void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 7b33524..b647bbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -38,14 +38,18 @@ import org.apache.flink.runtime.rpc.StartStoppable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.concurrent.ThreadSafe;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -199,4 +203,18 @@ public class AkkaRpcService implements RpcService {
 			throw new IllegalArgumentException("Cannot get address for non " + className + '.');
 		}
 	}
+
+	@Override
+	public ExecutionContext getExecutionContext() {
+		return actorSystem.dispatcher();
+	}
+
+	@Override
+	public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
+		checkNotNull(runnable, "runnable");
+		checkNotNull(unit, "unit");
+		checkArgument(delay >= 0, "delay must be zero or larger");
+
+		actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, getExecutionContext());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
index c18906c..ce4f9d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -36,6 +36,7 @@ public final class RunAsync implements Serializable {
 	private final long delay;
 
 	/**
+	 * Creates a new {@code RunAsync} message.
 	 * 
 	 * @param runnable  The Runnable to run.
 	 * @param delay     The delay in milliseconds. Zero indicates immediate execution.

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
new file mode 100644
index 0000000..2de560a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses given to registration attempts from {@link RetryingRegistration}.
+ */
+public abstract class RegistrationResponse implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	// ----------------------------------------------------------------------------
+	
+	/**
+	 * Base class for a successful registration. Concrete registration implementations
+	 * will typically extend this class to attach more information.
+	 */
+	public static class Success extends RegistrationResponse {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public String toString() {
+			return "Registration Successful";
+		}
+	}
+
+	// ----------------------------------------------------------------------------
+
+	/**
+	 * A rejected (declined) registration.
+	 */
+	public static final class Decline extends RegistrationResponse {
+		private static final long serialVersionUID = 1L;
+
+		/** the rejection reason */
+		private final String reason;
+
+		/**
+		 * Creates a new rejection message.
+		 * 
+		 * @param reason The reason for the rejection.
+		 */
+		public Decline(String reason) {
+			this.reason = reason != null ? reason : "(unknown)";
+		}
+
+		/**
+		 * Gets the reason for the rejection.
+		 */
+		public String getReason() {
+			return reason;
+		}
+
+		@Override
+		public String toString() {
+			return "Registration Declined (" + reason + ')';
+		}
+	}
+}
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
new file mode 100644
index 0000000..4c93684
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * This utility class implements the basis of registering one component at another component,
+ * for example registering the TaskExecutor at the ResourceManager.
+ * This {@code RetryingRegistration} implements both the initial address resolution
+ * and the retries-with-backoff strategy.
+ * 
+ * <p>The registration gives access to a future that is completed upon successful registration.
+ * The registration can be canceled, for example when the target where it tries to register
+ * at looses leader status.
+ * 
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Success> The type of the successful registration responses.
+ */
+public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
+
+	// ------------------------------------------------------------------------
+	//  default configuration values
+	// ------------------------------------------------------------------------
+
+	private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
+
+	private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
+
+	private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
+
+	private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
+
+	// ------------------------------------------------------------------------
+	// Fields
+	// ------------------------------------------------------------------------
+
+	private final Logger log;
+
+	private final RpcService rpcService;
+
+	private final String targetName;
+
+	private final Class<Gateway> targetType;
+
+	private final String targetAddress;
+
+	private final UUID leaderId;
+
+	private final Promise<Tuple2<Gateway, Success>> completionPromise;
+
+	private final long initialRegistrationTimeout;
+
+	private final long maxRegistrationTimeout;
+
+	private final long delayOnError;
+
+	private final long delayOnRefusedRegistration;
+
+	private volatile boolean canceled;
+
+	// ------------------------------------------------------------------------
+
+	public RetryingRegistration(
+			Logger log,
+			RpcService rpcService,
+			String targetName,
+			Class<Gateway> targetType,
+			String targetAddress,
+			UUID leaderId) {
+		this(log, rpcService, targetName, targetType, targetAddress, leaderId,
+				INITIAL_REGISTRATION_TIMEOUT_MILLIS, MAX_REGISTRATION_TIMEOUT_MILLIS,
+				ERROR_REGISTRATION_DELAY_MILLIS, REFUSED_REGISTRATION_DELAY_MILLIS);
+	}
+
+	public RetryingRegistration(
+			Logger log,
+			RpcService rpcService,
+			String targetName, 
+			Class<Gateway> targetType,
+			String targetAddress,
+			UUID leaderId,
+			long initialRegistrationTimeout,
+			long maxRegistrationTimeout,
+			long delayOnError,
+			long delayOnRefusedRegistration) {
+
+		checkArgument(initialRegistrationTimeout > 0, "initial registration timeout must be greater than zero");
+		checkArgument(maxRegistrationTimeout > 0, "maximum registration timeout must be greater than zero");
+		checkArgument(delayOnError >= 0, "delay on error must be non-negative");
+		checkArgument(delayOnRefusedRegistration >= 0, "delay on refused registration must be non-negative");
+
+		this.log = checkNotNull(log);
+		this.rpcService = checkNotNull(rpcService);
+		this.targetName = checkNotNull(targetName);
+		this.targetType = checkNotNull(targetType);
+		this.targetAddress = checkNotNull(targetAddress);
+		this.leaderId = checkNotNull(leaderId);
+		this.initialRegistrationTimeout = initialRegistrationTimeout;
+		this.maxRegistrationTimeout = maxRegistrationTimeout;
+		this.delayOnError = delayOnError;
+		this.delayOnRefusedRegistration = delayOnRefusedRegistration;
+
+		this.completionPromise = new DefaultPromise<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  completion and cancellation
+	// ------------------------------------------------------------------------
+
+	public Future<Tuple2<Gateway, Success>> getFuture() {
+		return completionPromise.future();
+	}
+
+	/**
+	 * Cancels the registration procedure.
+	 */
+	public void cancel() {
+		canceled = true;
+	}
+
+	/**
+	 * Checks if the registration was canceled.
+	 * @return True if the registration was canceled, false otherwise.
+	 */
+	public boolean isCanceled() {
+		return canceled;
+	}
+
+	// ------------------------------------------------------------------------
+	//  registration
+	// ------------------------------------------------------------------------
+
+	protected abstract Future<RegistrationResponse> invokeRegistration(
+			Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception;
+
+	/**
+	 * This method resolves the target address to a callable gateway and starts the
+	 * registration after that.
+	 */
+	@SuppressWarnings("unchecked")
+	public void startRegistration() {
+		try {
+			// trigger resolution of the resource manager address to a callable gateway
+			Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
+	
+			// upon success, start the registration attempts
+			resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() {
+				@Override
+				public void onSuccess(Gateway result) {
+					log.info("Resolved {} address, beginning registration", targetName);
+					register(result, 1, initialRegistrationTimeout);
+				}
+			}, rpcService.getExecutionContext());
+	
+			// upon failure, retry, unless this is cancelled
+			resourceManagerFuture.onFailure(new OnFailure() {
+				@Override
+				public void onFailure(Throwable failure) {
+					if (!isCanceled()) {
+						log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress);
+						startRegistration();
+					}
+				}
+			}, rpcService.getExecutionContext());
+		}
+		catch (Throwable t) {
+			cancel();
+			completionPromise.tryFailure(t);
+		}
+	}
+
+	/**
+	 * This method performs a registration attempt and triggers either a success notification or a retry,
+	 * depending on the result.
+	 */
+	@SuppressWarnings("unchecked")
+	private void register(final Gateway gateway, final int attempt, final long timeoutMillis) {
+		// eager check for canceling to avoid some unnecessary work
+		if (canceled) {
+			return;
+		}
+
+		try {
+			log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
+			Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
+	
+			// if the registration was successful, let the TaskExecutor know
+			registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() {
+				
+				@Override
+				public void onSuccess(RegistrationResponse result) throws Throwable {
+					if (!isCanceled()) {
+						if (result instanceof RegistrationResponse.Success) {
+							// registration successful!
+							Success success = (Success) result;
+							completionPromise.success(new Tuple2<>(gateway, success));
+						}
+						else {
+							// registration refused or unknown
+							if (result instanceof RegistrationResponse.Decline) {
+								RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
+								log.info("Registration at {} was declined: {}", targetName, decline.getReason());
+							} else {
+								log.error("Received unknown response to registration attempt: " + result);
+							}
+
+							log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration);
+							registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration);
+						}
+					}
+				}
+			}, rpcService.getExecutionContext());
+	
+			// upon failure, retry
+			registrationFuture.onFailure(new OnFailure() {
+				@Override
+				public void onFailure(Throwable failure) {
+					if (!isCanceled()) {
+						if (failure instanceof TimeoutException) {
+							// we simply have not received a response in time. maybe the timeout was
+							// very low (initial fast registration attempts), maybe the target endpoint is
+							// currently down.
+							if (log.isDebugEnabled()) {
+								log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
+										targetName, targetAddress, attempt, timeoutMillis);
+							}
+	
+							long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout);
+							register(gateway, attempt + 1, newTimeoutMillis);
+						}
+						else {
+							// a serious failure occurred. we still should not give up, but keep trying
+							log.error("Registration at " + targetName + " failed due to an error", failure);
+							log.info("Pausing and re-attempting registration in {} ms", delayOnError);
+	
+							registerLater(gateway, 1, initialRegistrationTimeout, delayOnError);
+						}
+					}
+				}
+			}, rpcService.getExecutionContext());
+		}
+		catch (Throwable t) {
+			cancel();
+			completionPromise.tryFailure(t);
+		}
+	}
+
+	private void registerLater(final Gateway gateway, final int attempt, final long timeoutMillis, long delay) {
+		rpcService.scheduleRunnable(new Runnable() {
+			@Override
+			public void run() {
+				register(gateway, attempt, timeoutMillis);
+			}
+		}, delay, TimeUnit.MILLISECONDS);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 729ef0c..6f34465 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -19,19 +19,24 @@
 package org.apache.flink.runtime.rpc.resourcemanager;
 
 import akka.dispatch.Mapper;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.util.Preconditions;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -93,4 +98,22 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 		System.out.println("SlotRequest: " + slotRequest);
 		return new SlotAssignment();
 	}
+
+
+	/**
+	 *
+	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
+	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
+	 * @param resourceID               The resource ID of the TaskExecutor that registers
+	 *
+	 * @return The response by the ResourceManager.
+	 */
+	@RpcMethod
+	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
+			UUID resourceManagerLeaderId,
+			String taskExecutorAddress,
+			ResourceID resourceID) {
+
+		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
index 464a261..afddb01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -18,14 +18,18 @@
 
 package org.apache.flink.runtime.rpc.resourcemanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.UUID;
+
 /**
- * {@link ResourceManager} rpc gateway interface.
+ * The {@link ResourceManager}'s RPC gateway interface.
  */
 public interface ResourceManagerGateway extends RpcGateway {
 
@@ -55,4 +59,19 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @return Future slot assignment
 	 */
 	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+
+	/**
+	 * 
+	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
+	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
+	 * @param resourceID               The resource ID of the TaskExecutor that registers
+	 * @param timeout                  The timeout for the response.
+	 * 
+	 * @return The future to the response by the ResourceManager.
+	 */
+	Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
+			UUID resourceManagerLeaderId,
+			String taskExecutorAddress,
+			ResourceID resourceID,
+			@RpcTimeout FiniteDuration timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
new file mode 100644
index 0000000..e42fa4a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import java.io.Serializable;
+
+/**
+ * A report about the current status of all slots of the TaskExecutor, describing
+ * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
+ * have been allocated to.
+ */
+public class SlotReport implements Serializable{
+
+	private static final long serialVersionUID = 1L;
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "SlotReport";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index 3a7dd9f..1a637bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -18,67 +18,152 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
-import akka.dispatch.ExecutionContexts$;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.ExecutionContext;
 
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * TaskExecutor implementation. The task executor is responsible for the execution of multiple
  * {@link org.apache.flink.runtime.taskmanager.Task}.
- *
- * It offers the following methods as part of its rpc interface to interact with him remotely:
- * <ul>
- *     <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given task on the TaskExecutor</li>
- *     <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task identified by the {@link ExecutionAttemptID}</li>
- * </ul>
  */
 public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
-	private final ExecutionContext executionContext;
-	private final Set<ExecutionAttemptID> tasks = new HashSet<>();
 
-	public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
+	/** The unique resource ID of this TaskExecutor */
+	private final ResourceID resourceID;
+
+	/** The access to the leader election and metadata storage services */
+	private final HighAvailabilityServices haServices;
+
+	// --------- resource manager --------
+
+	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
+
+	// ------------------------------------------------------------------------
+
+	public TaskExecutor(
+			RpcService rpcService,
+			HighAvailabilityServices haServices,
+			ResourceID resourceID) {
+
 		super(rpcService);
-		this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(
-			Preconditions.checkNotNull(executorService));
+
+		this.haServices = checkNotNull(haServices);
+		this.resourceID = checkNotNull(resourceID);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void start() {
+		// start by connecting to the ResourceManager
+		try {
+			haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
+		} catch (Exception e) {
+			onFatalErrorAsync(e);
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  RPC methods - ResourceManager related
+	// ------------------------------------------------------------------------
+
+	@RpcMethod
+	public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
+		if (resourceManagerConnection != null) {
+			if (newLeaderAddress != null) {
+				// the resource manager switched to a new leader
+				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+						resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+			}
+			else {
+				// address null means that the current leader is lost without a new leader being there, yet
+				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+						resourceManagerConnection.getResourceManagerAddress());
+			}
+
+			// drop the current connection or connection attempt
+			if (resourceManagerConnection != null) {
+				resourceManagerConnection.close();
+				resourceManagerConnection = null;
+			}
+		}
+
+		// establish a connection to the new leader
+		if (newLeaderAddress != null) {
+			log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+			resourceManagerConnection = 
+					new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId);
+			resourceManagerConnection.start();
+		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Error handling
+	// ------------------------------------------------------------------------
+
 	/**
-	 * Execute the given task on the task executor. The task is described by the provided
-	 * {@link TaskDeploymentDescriptor}.
-	 *
-	 * @param taskDeploymentDescriptor Descriptor for the task to be executed
-	 * @return Acknowledge the start of the task execution
+	 * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
+	 * This method should be used when asynchronous threads want to notify the
+	 * TaskExecutor of a fatal error.
+	 * 
+	 * @param t The exception describing the fatal error
 	 */
-	@RpcMethod
-	public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
-		tasks.add(taskDeploymentDescriptor.getExecutionId());
-		return Acknowledge.get();
+	void onFatalErrorAsync(final Throwable t) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				onFatalError(t);
+			}
+		});
 	}
 
 	/**
-	 * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
-	 * the method throws an {@link Exception}.
-	 *
-	 * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
-	 * @return Acknowledge the task canceling
-	 * @throws Exception if the task with the given execution attempt id could not be found
+	 * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed.
+	 * This method must only be called from within the TaskExecutor's main thread.
+	 * 
+	 * @param t The exception describing the fatal error
 	 */
-	@RpcMethod
-	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception {
-		if (tasks.contains(executionAttemptId)) {
-			return Acknowledge.get();
-		} else {
-			throw new Exception("Could not find task.");
+	void onFatalError(Throwable t) {
+		// to be determined, probably delegate to a fatal error handler that 
+		// would either log (mini cluster) ot kill the process (yarn, mesos, ...)
+		log.error("FATAL ERROR", t);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utility classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The listener for leader changes of the resource manager
+	 */
+	private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+
+		@Override
+		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+			getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			onFatalErrorAsync(exception);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
index 450423e..b0b21b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
@@ -18,31 +18,18 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import scala.concurrent.Future;
+
+import java.util.UUID;
 
 /**
- * {@link TaskExecutor} rpc gateway interface
+ * {@link TaskExecutor} RPC gateway interface
  */
 public interface TaskExecutorGateway extends RpcGateway {
-	/**
-	 * Execute the given task on the task executor. The task is described by the provided
-	 * {@link TaskDeploymentDescriptor}.
-	 *
-	 * @param taskDeploymentDescriptor Descriptor for the task to be executed
-	 * @return Future acknowledge of the start of the task execution
-	 */
-	Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor);
 
-	/**
-	 * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
-	 * the method throws an {@link Exception}.
-	 *
-	 * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
-	 * @return Future acknowledge of the task canceling
-	 */
-	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId);
+	// ------------------------------------------------------------------------
+	//  ResourceManager handlers
+	// ------------------------------------------------------------------------
+
+	void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
new file mode 100644
index 0000000..641102d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt by a
+ * TaskExecutor.
+ */
+public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.Success implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final InstanceID registrationId;
+
+	private final long heartbeatInterval;
+
+	/**
+	 * Create a new {@code TaskExecutorRegistrationSuccess} message.
+	 * 
+	 * @param registrationId     The ID that the ResourceManager assigned the registration.
+	 * @param heartbeatInterval  The interval in which the ResourceManager will heartbeat the TaskExecutor.
+	 */
+	public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) {
+		this.registrationId = registrationId;
+		this.heartbeatInterval = heartbeatInterval;
+	}
+
+	/**
+	 * Gets the ID that the ResourceManager assigned the registration.
+	 */
+	public InstanceID getRegistrationId() {
+		return registrationId;
+	}
+
+	/**
+	 * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor.
+	 */
+	public long getHeartbeatInterval() {
+		return heartbeatInterval;
+	}
+
+	@Override
+	public String toString() {
+		return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')';
+	}
+
+}
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
new file mode 100644
index 0000000..ef75862
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.registration.RetryingRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class TaskExecutorToResourceManagerConnection {
+
+	/** the logger for all log messages of this class */
+	private final Logger log;
+
+	/** the TaskExecutor whose connection to the ResourceManager this represents */
+	private final TaskExecutor taskExecutor;
+
+	private final UUID resourceManagerLeaderId;
+
+	private final String resourceManagerAddress;
+
+	private ResourceManagerRegistration pendingRegistration;
+
+	private ResourceManagerGateway registeredResourceManager;
+
+	private InstanceID registrationId;
+
+	/** flag indicating that the connection is closed */
+	private volatile boolean closed;
+
+
+	public TaskExecutorToResourceManagerConnection(
+			Logger log,
+			TaskExecutor taskExecutor,
+			String resourceManagerAddress,
+			UUID resourceManagerLeaderId) {
+
+		this.log = checkNotNull(log);
+		this.taskExecutor = checkNotNull(taskExecutor);
+		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
+		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Life cycle
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	public void start() {
+		checkState(!closed, "The connection is already closed");
+		checkState(!isRegistered() && pendingRegistration == null, "The connection is already started");
+
+		ResourceManagerRegistration registration = new ResourceManagerRegistration(
+				log, taskExecutor.getRpcService(),
+				resourceManagerAddress, resourceManagerLeaderId,
+				taskExecutor.getAddress(), taskExecutor.getResourceID());
+
+		Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = registration.getFuture();
+		
+		future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
+			@Override
+			public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
+				registeredResourceManager = result.f0;
+				registrationId = result.f1.getRegistrationId();
+			}
+		}, taskExecutor.getMainThreadExecutionContext());
+		
+		// this future should only ever fail if there is a bug, not if the registration is declined
+		future.onFailure(new OnFailure() {
+			@Override
+			public void onFailure(Throwable failure) {
+				taskExecutor.onFatalError(failure);
+			}
+		}, taskExecutor.getMainThreadExecutionContext());
+	}
+
+	public void close() {
+		closed = true;
+
+		// make sure we do not keep re-trying forever
+		if (pendingRegistration != null) {
+			pendingRegistration.cancel();
+		}
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	public UUID getResourceManagerLeaderId() {
+		return resourceManagerLeaderId;
+	}
+
+	public String getResourceManagerAddress() {
+		return resourceManagerAddress;
+	}
+
+	/**
+	 * Gets the ResourceManagerGateway. This returns null until the registration is completed.
+	 */
+	public ResourceManagerGateway getResourceManager() {
+		return registeredResourceManager;
+	}
+
+	/**
+	 * Gets the ID under which the TaskExecutor is registered at the ResourceManager.
+	 * This returns null until the registration is completed.
+	 */
+	public InstanceID getRegistrationId() {
+		return registrationId;
+	}
+
+	public boolean isRegistered() {
+		return registeredResourceManager != null;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return String.format("Connection to ResourceManager %s (leaderId=%s)",
+				resourceManagerAddress, resourceManagerLeaderId); 
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	static class ResourceManagerRegistration
+			extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
+
+		private final String taskExecutorAddress;
+		
+		private final ResourceID resourceID;
+
+		public ResourceManagerRegistration(
+				Logger log,
+				RpcService rpcService,
+				String targetAddress,
+				UUID leaderId,
+				String taskExecutorAddress,
+				ResourceID resourceID) {
+
+			super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
+			this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
+			this.resourceID = checkNotNull(resourceID);
+		}
+
+		@Override
+		protected Future<RegistrationResponse> invokeRegistration(
+				ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
+
+			FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index fd55904..7b4ab89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -20,15 +20,17 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
 import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
 import org.junit.Test;
+
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -41,6 +43,49 @@ import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {
 
+	// ------------------------------------------------------------------------
+	//  shared test members
+	// ------------------------------------------------------------------------
+
+	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+	private static AkkaRpcService akkaRpcService =
+			new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+
+	@AfterClass
+	public static void shutdown() {
+		akkaRpcService.stopService();
+		actorSystem.shutdown();
+	}
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testScheduleRunnable() throws Exception {
+		final OneShotLatch latch = new OneShotLatch();
+		final long delay = 100;
+		final long start = System.nanoTime();
+
+		akkaRpcService.scheduleRunnable(new Runnable() {
+			@Override
+			public void run() {
+				latch.trigger();
+			}
+		}, delay, TimeUnit.MILLISECONDS);
+
+		latch.await();
+		final long stop = System.nanoTime();
+
+		assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
+	}
+
+	// ------------------------------------------------------------------------
+	//  specific component tests - should be moved to the test classes
+	//  for those components
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
 	 * {@link AkkaRpcService}.

http://git-wip-us.apache.org/repos/asf/flink/blob/24eab760/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index c96f4f6..9f9bab3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -18,93 +18,8 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
-import org.apache.flink.runtime.util.DirectExecutorService;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.cglib.proxy.InvocationHandler;
-import org.mockito.cglib.proxy.Proxy;
-import scala.concurrent.Future;
-
-import java.net.URL;
-import java.util.Collections;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
-
-	/**
-	 * Tests that we can deploy and cancel a task on the TaskExecutor without exceptions
-	 */
-	@Test
-	public void testTaskExecution() throws Exception {
-		RpcService testingRpcService = mock(RpcService.class);
-		InvocationHandler invocationHandler = mock(InvocationHandler.class);
-		Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
-		when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
-
-		DirectExecutorService directExecutorService = new DirectExecutorService();
-		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
-		taskExecutor.start();
-
-		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-			new JobID(),
-			"Test job",
-			new JobVertexID(),
-			new ExecutionAttemptID(),
-			new SerializedValue<ExecutionConfig>(null),
-			"Test task",
-			0,
-			1,
-			0,
-			new Configuration(),
-			new Configuration(),
-			"Invokable",
-			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-			Collections.<InputGateDeploymentDescriptor>emptyList(),
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList(),
-			0
-		);
-
-		Acknowledge ack = taskExecutor.executeTask(tdd);
-
-		ack = taskExecutor.cancelTask(tdd.getExecutionId());
-	}
-
-	/**
-	 * Tests that cancelling a non-existing task will return an exception
-	 */
-	@Test(expected=Exception.class)
-	public void testWrongTaskCancellation() throws Exception {
-		RpcService testingRpcService = mock(RpcService.class);
-		InvocationHandler invocationHandler = mock(InvocationHandler.class);
-		Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
-		when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
-		DirectExecutorService directExecutorService = null;
-		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
-		taskExecutor.start();
-
-		taskExecutor.cancelTask(new ExecutionAttemptID());
-
-		fail("The cancellation should have thrown an exception.");
-	}
+	
 }