You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/27 15:17:17 UTC
flink git commit: [FLINK-4530] [rpc] Generalize
TaskExecutorToResourceManagerConnection to be reusable
Repository: flink
Updated Branches:
refs/heads/flip-6 93775cef6 -> ed5c83dc2
[FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be reusable
This closes #2520
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed5c83dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed5c83dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed5c83dc
Branch: refs/heads/flip-6
Commit: ed5c83dc2c2a9d46f293b0de01342829e2e598a5
Parents: 93775ce
Author: zhuhaifengleon <zh...@gmail.com>
Authored: Mon Sep 26 17:43:44 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 16:55:22 2016 +0200
----------------------------------------------------------------------
.../JobMasterToResourceManagerConnection.java | 117 +++++++++++
.../registration/RegisteredRpcConnection.java | 192 +++++++++++++++++++
.../runtime/taskexecutor/TaskExecutor.java | 4 +-
...TaskExecutorToResourceManagerConnection.java | 127 +++---------
.../RegisteredRpcConnectionTest.java | 183 ++++++++++++++++++
.../registration/RetryingRegistrationTest.java | 6 +-
6 files changed, 519 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
new file mode 100644
index 0000000..71fce8c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The connection between a JobMaster and the ResourceManager.
+ */
+public class JobMasterToResourceManagerConnection
+ extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> {
+
+ /** the JobMaster whose connection to the ResourceManager this represents */
+ private final JobMaster jobMaster;
+
+ private final JobID jobID;
+
+ private final UUID jobMasterLeaderId;
+
+ public JobMasterToResourceManagerConnection(
+ Logger log,
+ JobID jobID,
+ JobMaster jobMaster,
+ UUID jobMasterLeaderId,
+ String resourceManagerAddress,
+ UUID resourceManagerLeaderId,
+ Executor executor) {
+
+ super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
+ this.jobMaster = checkNotNull(jobMaster);
+ this.jobID = checkNotNull(jobID);
+ this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
+ }
+
+ @Override
+ protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
+ return new JobMasterToResourceManagerConnection.ResourceManagerRegistration(
+ log, jobMaster.getRpcService(),
+ getTargetAddress(), getTargetLeaderId(),
+ jobMaster.getAddress(),jobID, jobMasterLeaderId);
+ }
+
+ @Override
+ protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) {
+ }
+
+ @Override
+ protected void onRegistrationFailure(Throwable failure) {
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class ResourceManagerRegistration
+ extends RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> {
+
+ private final String jobMasterAddress;
+
+ private final JobID jobID;
+
+ private final UUID jobMasterLeaderId;
+
+ ResourceManagerRegistration(
+ Logger log,
+ RpcService rpcService,
+ String targetAddress,
+ UUID leaderId,
+ String jobMasterAddress,
+ JobID jobID,
+ UUID jobMasterLeaderId) {
+
+ super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
+ this.jobMasterAddress = checkNotNull(jobMasterAddress);
+ this.jobID = checkNotNull(jobID);
+ this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
+ }
+
+ @Override
+ protected Future<RegistrationResponse> invokeRegistration(
+ ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
+
+ Time timeout = Time.milliseconds(timeoutMillis);
+ return gateway.registerJobMaster(leaderId, jobMasterLeaderId,jobMasterAddress, jobID, timeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
new file mode 100644
index 0000000..76093b0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This utility class implements the basis of RPC connecting from one component to another component,
+ * for example the RPC connection from TaskExecutor to ResourceManager.
+ * This {@code RegisteredRpcConnection} implements registration and get target gateway .
+ *
+ * <p>The registration gives access to a future that is completed upon successful registration.
+ * The RPC connection can be closed, for example when the target where it tries to register
+ * at looses leader status.
+ *
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Success> The type of the successful registration responses.
+ */
+public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
+
+ /** the logger for all log messages of this class */
+ protected final Logger log;
+
+ /** the target component leaderID, for example the ResourceManager leaderID */
+ private final UUID targetLeaderId;
+
+ /** the target component Address, for example the ResourceManager Address */
+ private final String targetAddress;
+
+ /** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
+ private final Executor executor;
+
+ /** the Registration of this RPC connection */
+ private RetryingRegistration<Gateway, Success> pendingRegistration;
+
+ /** the gateway to register, it's null until the registration is completed */
+ private volatile Gateway targetGateway;
+
+ /** flag indicating that the RPC connection is closed */
+ private volatile boolean closed;
+
+ // ------------------------------------------------------------------------
+
+ public RegisteredRpcConnection(
+ Logger log,
+ String targetAddress,
+ UUID targetLeaderId,
+ Executor executor)
+ {
+ this.log = checkNotNull(log);
+ this.targetAddress = checkNotNull(targetAddress);
+ this.targetLeaderId = checkNotNull(targetLeaderId);
+ this.executor = checkNotNull(executor);
+ }
+
+ // ------------------------------------------------------------------------
+ // Life cycle
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ public void start() {
+ checkState(!closed, "The RPC connection is already closed");
+ checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
+
+ pendingRegistration = checkNotNull(generateRegistration());
+ pendingRegistration.startRegistration();
+
+ Future<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture();
+
+ future.thenAcceptAsync(new AcceptFunction<Tuple2<Gateway, Success>>() {
+ @Override
+ public void accept(Tuple2<Gateway, Success> result) {
+ targetGateway = result.f0;
+ onRegistrationSuccess(result.f1);
+ }
+ }, executor);
+
+ // this future should only ever fail if there is a bug, not if the registration is declined
+ future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+ @Override
+ public Void apply(Throwable failure) {
+ onRegistrationFailure(failure);
+ return null;
+ }
+ }, executor);
+ }
+
+ /**
+ * This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager
+ */
+ protected abstract RetryingRegistration<Gateway, Success> generateRegistration();
+
+ /**
+ * This method handle the Registration Response
+ */
+ protected abstract void onRegistrationSuccess(Success success);
+
+ /**
+ * This method handle the Registration failure
+ */
+ protected abstract void onRegistrationFailure(Throwable failure);
+
+ /**
+ * close connection
+ */
+ public void close() {
+ closed = true;
+
+ // make sure we do not keep re-trying forever
+ if (pendingRegistration != null) {
+ pendingRegistration.cancel();
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ public UUID getTargetLeaderId() {
+ return targetLeaderId;
+ }
+
+ public String getTargetAddress() {
+ return targetAddress;
+ }
+
+ /**
+ * Gets the RegisteredGateway. This returns null until the registration is completed.
+ */
+ public Gateway getTargetGateway() {
+ return targetGateway;
+ }
+
+ public boolean isConnected() {
+ return targetGateway != null;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ String connectionInfo = "(ADDRESS: " + targetAddress + " LEADERID: " + targetLeaderId + ")";
+
+ if (isConnected()) {
+ connectionInfo = "RPC connection to " + targetGateway.getClass().getSimpleName() + " " + connectionInfo;
+ } else {
+ connectionInfo = "RPC connection to " + connectionInfo;
+ }
+
+ if (isClosed()) {
+ connectionInfo = connectionInfo + " is closed";
+ } else if (isConnected()){
+ connectionInfo = connectionInfo + " is established";
+ } else {
+ connectionInfo = connectionInfo + " is connecting";
+ }
+
+ return connectionInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9e3c3b9..9d9ad2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -178,12 +178,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
if (newLeaderAddress != null) {
// the resource manager switched to a new leader
log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
- resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+ resourceManagerConnection.getTargetAddress(), newLeaderAddress);
}
else {
// address null means that the current leader is lost without a new leader being there, yet
log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
- resourceManagerConnection.getResourceManagerAddress());
+ resourceManagerConnection.getTargetAddress());
}
// drop the current connection or connection attempt
http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 647359d..b4b3bae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -19,16 +19,14 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.concurrent.Future;
import org.slf4j.Logger;
@@ -36,115 +34,46 @@ import java.util.UUID;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* The connection between a TaskExecutor and the ResourceManager.
*/
-public class TaskExecutorToResourceManagerConnection {
-
- /** the logger for all log messages of this class */
- private final Logger log;
+public class TaskExecutorToResourceManagerConnection
+ extends RegisteredRpcConnection<ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
/** the TaskExecutor whose connection to the ResourceManager this represents */
private final TaskExecutor taskExecutor;
- private final UUID resourceManagerLeaderId;
-
- private final String resourceManagerAddress;
-
- /** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
- private final Executor executor;
-
- private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration;
-
- private volatile ResourceManagerGateway registeredResourceManager;
-
private InstanceID registrationId;
- /** flag indicating that the connection is closed */
- private volatile boolean closed;
-
-
public TaskExecutorToResourceManagerConnection(
- Logger log,
- TaskExecutor taskExecutor,
- String resourceManagerAddress,
- UUID resourceManagerLeaderId,
- Executor executor) {
+ Logger log,
+ TaskExecutor taskExecutor,
+ String resourceManagerAddress,
+ UUID resourceManagerLeaderId,
+ Executor executor) {
- this.log = checkNotNull(log);
+ super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
this.taskExecutor = checkNotNull(taskExecutor);
- this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
- this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
- this.executor = checkNotNull(executor);
- }
-
- // ------------------------------------------------------------------------
- // Life cycle
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
- public void start() {
- checkState(!closed, "The connection is already closed");
- checkState(!isRegistered() && pendingRegistration == null, "The connection is already started");
-
- pendingRegistration = new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
- log, taskExecutor.getRpcService(),
- resourceManagerAddress, resourceManagerLeaderId,
- taskExecutor.getAddress(), taskExecutor.getResourceID());
- pendingRegistration.startRegistration();
-
- Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
-
- future.thenAcceptAsync(new AcceptFunction<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
- @Override
- public void accept(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
- registrationId = result.f1.getRegistrationId();
- registeredResourceManager = result.f0;
- }
- }, executor);
-
- // this future should only ever fail if there is a bug, not if the registration is declined
- future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable failure) {
- taskExecutor.onFatalErrorAsync(failure);
- return null;
- }
- }, executor);
- }
-
- public void close() {
- closed = true;
-
- // make sure we do not keep re-trying forever
- if (pendingRegistration != null) {
- pendingRegistration.cancel();
- }
}
- public boolean isClosed() {
- return closed;
- }
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- public UUID getResourceManagerLeaderId() {
- return resourceManagerLeaderId;
+ @Override
+ protected RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() {
+ return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
+ log, taskExecutor.getRpcService(),
+ getTargetAddress(), getTargetLeaderId(),
+ taskExecutor.getAddress(),taskExecutor.getResourceID());
}
- public String getResourceManagerAddress() {
- return resourceManagerAddress;
+ @Override
+ protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
+ registrationId = success.getRegistrationId();
}
- /**
- * Gets the ResourceManagerGateway. This returns null until the registration is completed.
- */
- public ResourceManagerGateway getResourceManager() {
- return registeredResourceManager;
+ @Override
+ protected void onRegistrationFailure(Throwable failure) {
+ taskExecutor.onFatalErrorAsync(failure);
}
/**
@@ -155,18 +84,6 @@ public class TaskExecutorToResourceManagerConnection {
return registrationId;
}
- public boolean isRegistered() {
- return registeredResourceManager != null;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return String.format("Connection to ResourceManager %s (leaderId=%s)",
- resourceManagerAddress, resourceManagerLeaderId);
- }
-
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
new file mode 100644
index 0000000..8558205
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import org.apache.flink.runtime.registration.RetryingRegistrationTest.TestRegistrationSuccess;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RegisteredRpcConnection, validating the successful, failure and close behavior.
+ */
+public class RegisteredRpcConnectionTest extends TestLogger {
+
+ @Test
+ public void testSuccessfulRpcConnection() throws Exception {
+ final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+ final UUID leaderId = UUID.randomUUID();
+ final String connectionID = "Test RPC Connection ID";
+
+ // an endpoint that immediately returns success
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
+ TestingRpcService rpcService = new TestingRpcService();
+
+ try {
+ rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+ TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+ connection.start();
+
+ //wait for connection established
+ Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+
+ // validate correct invocation and result
+ assertTrue(connection.isConnected());
+ assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
+ assertEquals(leaderId, connection.getTargetLeaderId());
+ assertEquals(testGateway, connection.getTargetGateway());
+ assertEquals(connectionID, connection.getConnectionId());
+ }
+ finally {
+ testGateway.stop();
+ rpcService.stopService();
+ }
+ }
+
+ @Test
+ public void testRpcConnectionFailures() throws Exception {
+ final String connectionFailureMessage = "Test RPC Connection failure";
+ final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+ final UUID leaderId = UUID.randomUUID();
+
+ TestingRpcService rpcService = new TestingRpcService();
+
+ try {
+ // gateway that upon calls Throw an exception
+ TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+ when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow(
+ new RuntimeException(connectionFailureMessage));
+
+ rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+ TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+ connection.start();
+
+ //wait for connection failure
+ Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+
+ // validate correct invocation and result
+ assertFalse(connection.isConnected());
+ assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
+ assertEquals(leaderId, connection.getTargetLeaderId());
+ assertNull(connection.getTargetGateway());
+ assertEquals(connectionFailureMessage, connection.getFailareMessage());
+ }
+ finally {
+ rpcService.stopService();
+ }
+ }
+
+ @Test
+ public void testRpcConnectionClose() throws Exception {
+ final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+ final UUID leaderId = UUID.randomUUID();
+ final String connectionID = "Test RPC Connection ID";
+
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
+ TestingRpcService rpcService = new TestingRpcService();
+
+ try{
+ rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+ TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+ connection.start();
+ //close the connection
+ connection.close();
+
+ // validate connection is closed
+ assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
+ assertEquals(leaderId, connection.getTargetLeaderId());
+ assertTrue(connection.isClosed());
+ }
+ finally {
+ testGateway.stop();
+ rpcService.stopService();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test RegisteredRpcConnection
+ // ------------------------------------------------------------------------
+
+ private static class TestRpcConnection extends RegisteredRpcConnection<TestRegistrationGateway, TestRegistrationSuccess> {
+
+ private final RpcService rpcService;
+
+ private String connectionId;
+
+ private String failureMessage;
+
+ public TestRpcConnection(String targetAddress,
+ UUID targetLeaderId,
+ Executor executor,
+ RpcService rpcService)
+ {
+ super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
+ this.rpcService = rpcService;
+ }
+
+ @Override
+ protected RetryingRegistration<TestRegistrationGateway, RetryingRegistrationTest.TestRegistrationSuccess> generateRegistration() {
+ return new RetryingRegistrationTest.TestRetryingRegistration(rpcService, getTargetAddress(), getTargetLeaderId());
+ }
+
+ @Override
+ protected void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) {
+ connectionId = success.getCorrelationId();
+ }
+
+ @Override
+ protected void onRegistrationFailure(Throwable failure) {
+ failureMessage = failure.getMessage();
+ }
+
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ public String getFailareMessage() {
+ return failureMessage;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index e56a9ec..6d6bbef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -298,12 +298,12 @@ public class RetryingRegistrationTest extends TestLogger {
// test registration
// ------------------------------------------------------------------------
- private static class TestRegistrationSuccess extends RegistrationResponse.Success {
+ protected static class TestRegistrationSuccess extends RegistrationResponse.Success {
private static final long serialVersionUID = 5542698790917150604L;
private final String correlationId;
- private TestRegistrationSuccess(String correlationId) {
+ public TestRegistrationSuccess(String correlationId) {
this.correlationId = correlationId;
}
@@ -312,7 +312,7 @@ public class RetryingRegistrationTest extends TestLogger {
}
}
- private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+ protected static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
// we use shorter timeouts here to speed up the tests
static final long INITIAL_TIMEOUT = 20;