You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/27 15:17:17 UTC

flink git commit: [FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be reusable

Repository: flink
Updated Branches:
  refs/heads/flip-6 93775cef6 -> ed5c83dc2


[FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be reusable

This closes #2520


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed5c83dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed5c83dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed5c83dc

Branch: refs/heads/flip-6
Commit: ed5c83dc2c2a9d46f293b0de01342829e2e598a5
Parents: 93775ce
Author: zhuhaifengleon <zh...@gmail.com>
Authored: Mon Sep 26 17:43:44 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 16:55:22 2016 +0200

----------------------------------------------------------------------
 .../JobMasterToResourceManagerConnection.java   | 117 +++++++++++
 .../registration/RegisteredRpcConnection.java   | 192 +++++++++++++++++++
 .../runtime/taskexecutor/TaskExecutor.java      |   4 +-
 ...TaskExecutorToResourceManagerConnection.java | 127 +++---------
 .../RegisteredRpcConnectionTest.java            | 183 ++++++++++++++++++
 .../registration/RetryingRegistrationTest.java  |   6 +-
 6 files changed, 519 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
new file mode 100644
index 0000000..71fce8c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The connection between a JobMaster and the ResourceManager.
+ */
+public class JobMasterToResourceManagerConnection 
+		extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> {
+
+	/** the JobMaster whose connection to the ResourceManager this represents */
+	private final JobMaster jobMaster;
+
+	private final JobID jobID;
+
+	private final UUID jobMasterLeaderId;
+
+	public JobMasterToResourceManagerConnection(
+			Logger log,
+			JobID jobID,
+			JobMaster jobMaster,
+			UUID jobMasterLeaderId,
+			String resourceManagerAddress,
+			UUID resourceManagerLeaderId,
+			Executor executor) {
+
+		super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
+		this.jobMaster = checkNotNull(jobMaster);
+		this.jobID = checkNotNull(jobID);
+		this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
+	}
+
+	@Override
+	protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
+		return new JobMasterToResourceManagerConnection.ResourceManagerRegistration(
+			log, jobMaster.getRpcService(),
+			getTargetAddress(), getTargetLeaderId(),
+			jobMaster.getAddress(),jobID, jobMasterLeaderId);
+	}
+
+	@Override
+	protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) {
+	}
+
+	@Override
+	protected void onRegistrationFailure(Throwable failure) {
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static class ResourceManagerRegistration
+		extends RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> {
+
+		private final String jobMasterAddress;
+
+		private final JobID jobID;
+
+		private final UUID jobMasterLeaderId;
+
+		ResourceManagerRegistration(
+			Logger log,
+			RpcService rpcService,
+			String targetAddress,
+			UUID leaderId,
+			String jobMasterAddress,
+			JobID jobID,
+			UUID jobMasterLeaderId) {
+
+			super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
+			this.jobMasterAddress = checkNotNull(jobMasterAddress);
+			this.jobID = checkNotNull(jobID);
+			this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
+		}
+
+		@Override
+		protected Future<RegistrationResponse> invokeRegistration(
+			ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
+
+			Time timeout = Time.milliseconds(timeoutMillis);
+			return gateway.registerJobMaster(leaderId, jobMasterLeaderId,jobMasterAddress, jobID, timeout);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
new file mode 100644
index 0000000..76093b0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This utility class implements the basis of RPC connecting from one component to another component,
+ * for example the RPC connection from TaskExecutor to ResourceManager.
+ * This {@code RegisteredRpcConnection} implements registration and get target gateway .
+ *
+ * <p>The registration gives access to a future that is completed upon successful registration.
+ * The RPC connection can be closed, for example when the target where it tries to register
+ * at looses leader status.
+ *
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Success> The type of the successful registration responses.
+ */
+public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
+
+	/** the logger for all log messages of this class */
+	protected final Logger log;
+
+	/** the target component leaderID, for example the ResourceManager leaderID */
+	private final UUID targetLeaderId;
+
+	/** the target component Address, for example the ResourceManager Address */
+	private final String targetAddress;
+
+	/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
+	private final Executor executor;
+
+	/** the Registration of this RPC connection */
+	private RetryingRegistration<Gateway, Success> pendingRegistration;
+
+	/** the gateway to register, it's null until the registration is completed */
+	private volatile Gateway targetGateway;
+
+	/** flag indicating that the RPC connection is closed */
+	private volatile boolean closed;
+
+	// ------------------------------------------------------------------------
+
+	public RegisteredRpcConnection(
+		Logger log,
+		String targetAddress,
+		UUID targetLeaderId,
+		Executor executor)
+	{
+		this.log = checkNotNull(log);
+		this.targetAddress = checkNotNull(targetAddress);
+		this.targetLeaderId = checkNotNull(targetLeaderId);
+		this.executor = checkNotNull(executor);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Life cycle
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	public void start() {
+		checkState(!closed, "The RPC connection is already closed");
+		checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
+
+		pendingRegistration = checkNotNull(generateRegistration());
+		pendingRegistration.startRegistration();
+
+		Future<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture();
+
+		future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, Success>>() {
+			@Override
+			public void accept(Tuple2<Gateway, Success> result) {
+				targetGateway = result.f0;
+				onRegistrationSuccess(result.f1);
+			}
+		}, executor);
+
+		// this future should only ever fail if there is a bug, not if the registration is declined
+		future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+			@Override
+			public Void apply(Throwable failure) {
+				onRegistrationFailure(failure);
+				return null;
+			}
+		}, executor);
+	}
+
+	/**
+	 * This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager
+	 */
+	protected abstract RetryingRegistration<Gateway, Success> generateRegistration();
+
+	/**
+	 * This method handle the Registration Response
+	 */
+	protected abstract void onRegistrationSuccess(Success success);
+
+	/**
+	 * This method handle the Registration failure
+	 */
+	protected abstract void onRegistrationFailure(Throwable failure);
+
+	/**
+	 * close connection
+	 */
+	public void close() {
+		closed = true;
+
+		// make sure we do not keep re-trying forever
+		if (pendingRegistration != null) {
+			pendingRegistration.cancel();
+		}
+	}
+
+	public boolean isClosed() {
+		return closed;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	public UUID getTargetLeaderId() {
+		return targetLeaderId;
+	}
+
+	public String getTargetAddress() {
+		return targetAddress;
+	}
+
+	/**
+	 * Gets the RegisteredGateway. This returns null until the registration is completed.
+	 */
+	public Gateway getTargetGateway() {
+		return targetGateway;
+	}
+
+	public boolean isConnected() {
+		return targetGateway != null;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		String connectionInfo = "(ADDRESS: " + targetAddress + " LEADERID: " + targetLeaderId + ")";
+
+		if (isConnected()) {
+			connectionInfo = "RPC connection to " + targetGateway.getClass().getSimpleName() + " " + connectionInfo;
+		} else {
+			connectionInfo = "RPC connection to " + connectionInfo;
+		}
+
+		if (isClosed()) {
+			connectionInfo = connectionInfo + " is closed";
+		} else if (isConnected()){
+			connectionInfo = connectionInfo + " is established";
+		} else {
+			connectionInfo = connectionInfo + " is connecting";
+		}
+
+		return connectionInfo;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9e3c3b9..9d9ad2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -178,12 +178,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			if (newLeaderAddress != null) {
 				// the resource manager switched to a new leader
 				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-					resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+					resourceManagerConnection.getTargetAddress(), newLeaderAddress);
 			}
 			else {
 				// address null means that the current leader is lost without a new leader being there, yet
 				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-					resourceManagerConnection.getResourceManagerAddress());
+					resourceManagerConnection.getTargetAddress());
 			}
 
 			// drop the current connection or connection attempt

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 647359d..b4b3bae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -19,16 +19,14 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.concurrent.Future;
 
 import org.slf4j.Logger;
 
@@ -36,115 +34,46 @@ import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The connection between a TaskExecutor and the ResourceManager.
  */
-public class TaskExecutorToResourceManagerConnection {
-
-	/** the logger for all log messages of this class */
-	private final Logger log;
+public class TaskExecutorToResourceManagerConnection
+		extends RegisteredRpcConnection<ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
 
 	/** the TaskExecutor whose connection to the ResourceManager this represents */
 	private final TaskExecutor taskExecutor;
 
-	private final UUID resourceManagerLeaderId;
-
-	private final String resourceManagerAddress;
-
-	/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
-	private final Executor executor;
-
-	private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration;
-
-	private volatile ResourceManagerGateway registeredResourceManager;
-
 	private InstanceID registrationId;
 
-	/** flag indicating that the connection is closed */
-	private volatile boolean closed;
-
-
 	public TaskExecutorToResourceManagerConnection(
-		Logger log,
-		TaskExecutor taskExecutor,
-		String resourceManagerAddress,
-		UUID resourceManagerLeaderId,
-		Executor executor) {
+			Logger log,
+			TaskExecutor taskExecutor,
+			String resourceManagerAddress,
+			UUID resourceManagerLeaderId,
+			Executor executor) {
 
-		this.log = checkNotNull(log);
+		super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
 		this.taskExecutor = checkNotNull(taskExecutor);
-		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
-		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
-		this.executor = checkNotNull(executor);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Life cycle
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	public void start() {
-		checkState(!closed, "The connection is already closed");
-		checkState(!isRegistered() && pendingRegistration == null, "The connection is already started");
-
-		pendingRegistration = new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
-				log, taskExecutor.getRpcService(),
-				resourceManagerAddress, resourceManagerLeaderId,
-				taskExecutor.getAddress(), taskExecutor.getResourceID());
-		pendingRegistration.startRegistration();
-
-		Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
-
-		future.thenAcceptAsync(new AcceptFunction<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
-			@Override
-			public void accept(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
-				registrationId = result.f1.getRegistrationId();
-				registeredResourceManager = result.f0;
-			}
-		}, executor);
-		
-		// this future should only ever fail if there is a bug, not if the registration is declined
-		future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-			@Override
-			public Void apply(Throwable failure) {
-				taskExecutor.onFatalErrorAsync(failure);
-				return null;
-			}
-		}, executor);
-	}
-
-	public void close() {
-		closed = true;
-
-		// make sure we do not keep re-trying forever
-		if (pendingRegistration != null) {
-			pendingRegistration.cancel();
-		}
 	}
 
-	public boolean isClosed() {
-		return closed;
-	}
 
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	public UUID getResourceManagerLeaderId() {
-		return resourceManagerLeaderId;
+	@Override
+	protected RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() {
+		return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
+			log, taskExecutor.getRpcService(),
+			getTargetAddress(), getTargetLeaderId(),
+			taskExecutor.getAddress(),taskExecutor.getResourceID());
 	}
 
-	public String getResourceManagerAddress() {
-		return resourceManagerAddress;
+	@Override
+	protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
+		registrationId = success.getRegistrationId();
 	}
 
-	/**
-	 * Gets the ResourceManagerGateway. This returns null until the registration is completed.
-	 */
-	public ResourceManagerGateway getResourceManager() {
-		return registeredResourceManager;
+	@Override
+	protected void onRegistrationFailure(Throwable failure) {
+		taskExecutor.onFatalErrorAsync(failure);
 	}
 
 	/**
@@ -155,18 +84,6 @@ public class TaskExecutorToResourceManagerConnection {
 		return registrationId;
 	}
 
-	public boolean isRegistered() {
-		return registeredResourceManager != null;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return String.format("Connection to ResourceManager %s (leaderId=%s)",
-				resourceManagerAddress, resourceManagerLeaderId); 
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
new file mode 100644
index 0000000..8558205
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import org.apache.flink.runtime.registration.RetryingRegistrationTest.TestRegistrationSuccess;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RegisteredRpcConnection, validating the successful, failure and close behavior.
+ */
+public class RegisteredRpcConnectionTest extends TestLogger {
+
+	@Test
+	public void testSuccessfulRpcConnection() throws Exception {
+		final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+		final UUID leaderId = UUID.randomUUID();
+		final String connectionID = "Test RPC Connection ID";
+
+		// an endpoint that immediately returns success
+		TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
+		TestingRpcService rpcService = new TestingRpcService();
+
+		try {
+			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+			TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+			connection.start();
+
+			//wait for connection established
+			Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+
+			// validate correct invocation and result
+			assertTrue(connection.isConnected());
+			assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
+			assertEquals(leaderId, connection.getTargetLeaderId());
+			assertEquals(testGateway, connection.getTargetGateway());
+			assertEquals(connectionID, connection.getConnectionId());
+		}
+		finally {
+			testGateway.stop();
+			rpcService.stopService();
+		}
+	}
+
+	@Test
+	public void testRpcConnectionFailures() throws Exception {
+		final String connectionFailureMessage = "Test RPC Connection failure";
+		final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+		final UUID leaderId = UUID.randomUUID();
+
+		TestingRpcService rpcService = new TestingRpcService();
+
+		try {
+			// gateway that upon calls Throw an exception
+			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow(
+				new RuntimeException(connectionFailureMessage));
+
+			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+			TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+			connection.start();
+
+			//wait for connection failure
+			Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+
+			// validate correct invocation and result
+			assertFalse(connection.isConnected());
+			assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
+			assertEquals(leaderId, connection.getTargetLeaderId());
+			assertNull(connection.getTargetGateway());
+			assertEquals(connectionFailureMessage, connection.getFailareMessage());
+		}
+		finally {
+			rpcService.stopService();
+		}
+	}
+
+	@Test
+	public void testRpcConnectionClose() throws Exception {
+		final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+		final UUID leaderId = UUID.randomUUID();
+		final String connectionID = "Test RPC Connection ID";
+
+		TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
+		TestingRpcService rpcService = new TestingRpcService();
+
+		try{
+			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+			TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+			connection.start();
+			//close the connection
+			connection.close();
+
+			// validate connection is closed
+			assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
+			assertEquals(leaderId, connection.getTargetLeaderId());
+			assertTrue(connection.isClosed());
+		}
+		finally {
+			testGateway.stop();
+			rpcService.stopService();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test RegisteredRpcConnection
+	// ------------------------------------------------------------------------
+
+	private static class TestRpcConnection extends RegisteredRpcConnection<TestRegistrationGateway, TestRegistrationSuccess> {
+
+		private final RpcService rpcService;
+
+		private String connectionId;
+
+		private String failureMessage;
+
+		public TestRpcConnection(String targetAddress,
+								 UUID targetLeaderId,
+								 Executor executor,
+								 RpcService rpcService)
+		{
+			super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
+			this.rpcService = rpcService;
+		}
+
+		@Override
+		protected RetryingRegistration<TestRegistrationGateway, RetryingRegistrationTest.TestRegistrationSuccess> generateRegistration() {
+			return new RetryingRegistrationTest.TestRetryingRegistration(rpcService, getTargetAddress(), getTargetLeaderId());
+		}
+
+		@Override
+		protected void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) {
+			connectionId = success.getCorrelationId();
+		}
+
+		@Override
+		protected void onRegistrationFailure(Throwable failure) {
+			failureMessage = failure.getMessage();
+		}
+
+		public String getConnectionId() {
+			return connectionId;
+		}
+
+		public String getFailareMessage() {
+			return failureMessage;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index e56a9ec..6d6bbef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -298,12 +298,12 @@ public class RetryingRegistrationTest extends TestLogger {
 	//  test registration
 	// ------------------------------------------------------------------------
 
-	private static class TestRegistrationSuccess extends RegistrationResponse.Success {
+	protected static class TestRegistrationSuccess extends RegistrationResponse.Success {
 		private static final long serialVersionUID = 5542698790917150604L;
 
 		private final String correlationId;
 
-		private TestRegistrationSuccess(String correlationId) {
+		public TestRegistrationSuccess(String correlationId) {
 			this.correlationId = correlationId;
 		}
 
@@ -312,7 +312,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		}
 	}
 
-	private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+	protected static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
 
 		// we use shorter timeouts here to speed up the tests
 		static final long INITIAL_TIMEOUT = 20;