You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:53:04 UTC
[31/50] [abbrv] flink git commit: [FLINK-4355] [cluster management]
Add tests for the TaskManager -> ResourceManager registration.
[FLINK-4355] [cluster management] Add tests for the TaskManager -> ResourceManager registration.
This closes #2395.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ea97a18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ea97a18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ea97a18
Branch: refs/heads/flip-6
Commit: 5ea97a185d266f96570a4ea3c967ecbc384378cd
Parents: e6b0f12
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 19 23:45:54 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 21 11:39:15 2016 +0200
----------------------------------------------------------------------
.../rpc/registration/RetryingRegistration.java | 4 +
.../runtime/rpc/taskexecutor/SlotReport.java | 38 ---
.../runtime/rpc/taskexecutor/TaskExecutor.java | 12 +
...TaskExecutorToResourceManagerConnection.java | 4 +
.../TestingHighAvailabilityServices.java | 53 +++
.../flink/runtime/rpc/TestingGatewayBase.java | 18 +-
.../registration/RetryingRegistrationTest.java | 336 +++++++++++++++++++
.../registration/TestRegistrationGateway.java | 85 +++++
.../rpc/taskexecutor/TaskExecutorTest.java | 92 ++++-
9 files changed, 602 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
index 4c93684..dcb5011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
@@ -58,12 +58,16 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
// default configuration values
// ------------------------------------------------------------------------
+ /** default value for the initial registration timeout (milliseconds) */
private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
+ /** default value for the maximum registration timeout, after exponential back-off (milliseconds) */
private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
+ /** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout) */
private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
+ /** The pause (milliseconds) made after the registration attempt was refused */
private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
deleted file mode 100644
index e42fa4a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import java.io.Serializable;
-
-/**
- * A report about the current status of all slots of the TaskExecutor, describing
- * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
- * have been allocated to.
- */
-public class SlotReport implements Serializable{
-
- private static final long serialVersionUID = 1L;
-
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "SlotReport";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index 1a637bb..f201e00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc.taskexecutor;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
@@ -72,6 +73,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
@Override
public void start() {
+ super.start();
+
// start by connecting to the ResourceManager
try {
haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
@@ -148,6 +151,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
// ------------------------------------------------------------------------
+ // Access to fields for testing
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
+ return resourceManagerConnection;
+ }
+
+ // ------------------------------------------------------------------------
// Utility classes
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
index ef75862..f398b7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -40,6 +40,9 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
+/**
+ * The connection between a TaskExecutor and the ResourceManager.
+ */
public class TaskExecutorToResourceManagerConnection {
/** the logger for all log messages of this class */
@@ -87,6 +90,7 @@ public class TaskExecutorToResourceManagerConnection {
log, taskExecutor.getRpcService(),
resourceManagerAddress, resourceManagerLeaderId,
taskExecutor.getAddress(), taskExecutor.getResourceID());
+ registration.startRegistration();
Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = registration.getFuture();
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
new file mode 100644
index 0000000..3a9f943
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * A variant of the HighAvailabilityServices for testing. Each individual service can be set
+ * to an arbitrary implementation, such as a mock or default service.
+ */
+public class TestingHighAvailabilityServices implements HighAvailabilityServices {
+
+ private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
+
+
+ // ------------------------------------------------------------------------
+ // Setters for mock / testing implementations
+ // ------------------------------------------------------------------------
+
+ public void setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever) {
+ this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
+ }
+
+ // ------------------------------------------------------------------------
+ // HA Services Methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
+ if (service != null) {
+ return service;
+ } else {
+ throw new IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 4256135..8133a87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -34,8 +34,15 @@ public abstract class TestingGatewayBase implements RpcGateway {
private final ScheduledExecutorService executor;
- protected TestingGatewayBase() {
+ private final String address;
+
+ protected TestingGatewayBase(final String address) {
this.executor = Executors.newSingleThreadScheduledExecutor();
+ this.address = address;
+ }
+
+ protected TestingGatewayBase() {
+ this("localhost");
}
// ------------------------------------------------------------------------
@@ -53,6 +60,15 @@ public abstract class TestingGatewayBase implements RpcGateway {
}
// ------------------------------------------------------------------------
+ // Base class methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
+
+ // ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
new file mode 100644
index 0000000..9508825
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior.
+ */
+public class RetryingRegistrationTest extends TestLogger {
+
+ @Test
+ public void testSimpleSuccessfulRegistration() throws Exception {
+ final String testId = "laissez les bon temps roulez";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ // an endpoint that immediately returns success
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ assertNotNull(future);
+
+ // multiple accesses return the same future
+ assertEquals(future, registration.getFuture());
+
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+ }
+ finally {
+ testGateway.stop();
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ public void testPropagateFailures() throws Exception {
+ final String testExceptionMessage = "testExceptionMessage";
+
+ // RPC service that fails with exception upon the connection
+ RpcService rpc = mock(RpcService.class);
+ when(rpc.connect(anyString(), any(Class.class))).thenThrow(new RuntimeException(testExceptionMessage));
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
+ registration.startRegistration();
+
+ Future<?> future = registration.getFuture();
+ assertTrue(future.failed().isCompleted());
+
+ assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage());
+ }
+
+ @Test
+ public void testRetryConnectOnFailure() throws Exception {
+ final String testId = "laissez les bon temps roulez";
+ final UUID leaderId = UUID.randomUUID();
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId));
+
+ try {
+ // RPC service that fails upon the first connection, but succeeds on the second
+ RpcService rpc = mock(RpcService.class);
+ when(rpc.connect(anyString(), any(Class.class))).thenReturn(
+ Futures.failed(new Exception("test connect failure")), // first connection attempt fails
+ Futures.successful(testGateway) // second connection attempt succeeds
+ );
+ when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
+ registration.startRegistration();
+
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS));
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+ }
+ finally {
+ testGateway.stop();
+ executor.shutdown();
+ }
+ }
+
+ @Test
+ public void testRetriesOnTimeouts() throws Exception {
+ final String testId = "rien ne va plus";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ // an endpoint that immediately returns futures with timeouts before returning a successful future
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(
+ null, // timeout
+ null, // timeout
+ new TestRegistrationSuccess(testId) // success
+ );
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+ long started = System.nanoTime();
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ long finished = System.nanoTime();
+ long elapsedMillis = (finished - started) / 1000000;
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+
+ // validate that some retry-delay / back-off behavior happened
+ assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
+ }
+ finally {
+ rpc.stopService();
+ testGateway.stop();
+ }
+ }
+
+ @Test
+ public void testDecline() throws Exception {
+ final String testId = "qui a coupe le fromage";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ TestRegistrationGateway testGateway = new TestRegistrationGateway(
+ null, // timeout
+ new RegistrationResponse.Decline("no reason "),
+ null, // timeout
+ new TestRegistrationSuccess(testId) // success
+ );
+
+ try {
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+ long started = System.nanoTime();
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ long finished = System.nanoTime();
+ long elapsedMillis = (finished - started) / 1000000;
+
+ // validate correct invocation and result
+ assertEquals(testId, success.f1.getCorrelationId());
+ assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
+
+ // validate that some retry-delay / back-off behavior happened
+ assertTrue("retries did not properly back off", elapsedMillis >=
+ 2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE);
+ }
+ finally {
+ testGateway.stop();
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRetryOnError() throws Exception {
+ final String testId = "Petit a petit, l'oiseau fait son nid";
+ final String testEndpointAddress = "<test-address>";
+ final UUID leaderId = UUID.randomUUID();
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ // gateway that upon calls first responds with a failure, then with a success
+ TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+
+ when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
+ Futures.<RegistrationResponse>failed(new Exception("test exception")),
+ Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
+
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+ long started = System.nanoTime();
+ registration.startRegistration();
+
+ Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ Await.result(future, new FiniteDuration(10, SECONDS));
+
+ long finished = System.nanoTime();
+ long elapsedMillis = (finished - started) / 1000000;
+
+ assertEquals(testId, success.f1.getCorrelationId());
+
+ // validate that some retry-delay / back-off behavior happened
+ assertTrue("retries did not properly back off",
+ elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR);
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ public void testCancellation() throws Exception {
+ final String testEndpointAddress = "my-test-address";
+ final UUID leaderId = UUID.randomUUID();
+
+ TestingRpcService rpc = new TestingRpcService();
+
+ try {
+ Promise<RegistrationResponse> result = Futures.promise();
+
+ TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+ when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future());
+
+ rpc.registerGateway(testEndpointAddress, testGateway);
+
+ TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+ registration.startRegistration();
+
+ // cancel and fail the current registration attempt
+ registration.cancel();
+ result.failure(new TimeoutException());
+
+ // there should not be a second registration attempt
+ verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong());
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test registration
+ // ------------------------------------------------------------------------
+
+ private static class TestRegistrationSuccess extends RegistrationResponse.Success {
+ private static final long serialVersionUID = 5542698790917150604L;
+
+ private final String correlationId;
+
+ private TestRegistrationSuccess(String correlationId) {
+ this.correlationId = correlationId;
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+ }
+
+ private static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+
+ // we use shorter timeouts here to speed up the tests
+ static final long INITIAL_TIMEOUT = 20;
+ static final long MAX_TIMEOUT = 200;
+ static final long DELAY_ON_ERROR = 200;
+ static final long DELAY_ON_DECLINE = 200;
+
+ public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
+ super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
+ rpc, "TestEndpoint",
+ TestRegistrationGateway.class,
+ targetAddress, leaderId,
+ INITIAL_TIMEOUT, MAX_TIMEOUT, DELAY_ON_ERROR, DELAY_ON_DECLINE);
+ }
+
+ @Override
+ protected Future<RegistrationResponse> invokeRegistration(
+ TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
+ return gateway.registrationCall(leaderId, timeoutMillis);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
new file mode 100644
index 0000000..a049e48
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.runtime.rpc.TestingGatewayBase;
+import org.apache.flink.util.Preconditions;
+
+import scala.concurrent.Future;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestRegistrationGateway extends TestingGatewayBase {
+
+ private final BlockingQueue<RegistrationCall> invocations;
+
+ private final RegistrationResponse[] responses;
+
+ private int pos;
+
+ public TestRegistrationGateway(RegistrationResponse... responses) {
+ Preconditions.checkArgument(responses != null && responses.length > 0);
+
+ this.invocations = new LinkedBlockingQueue<>();
+ this.responses = responses;
+
+ }
+
+ // ------------------------------------------------------------------------
+
+ public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
+ invocations.add(new RegistrationCall(leaderId, timeout));
+
+ RegistrationResponse response = responses[pos];
+ if (pos < responses.length - 1) {
+ pos++;
+ }
+
+ // return a completed future (for a proper value), or one that never completes and will time out (for null)
+ return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+ }
+
+ public BlockingQueue<RegistrationCall> getInvocations() {
+ return invocations;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class RegistrationCall {
+ private final UUID leaderId;
+ private final long timeout;
+
+ public RegistrationCall(UUID leaderId, long timeout) {
+ this.leaderId = leaderId;
+ this.timeout = timeout;
+ }
+
+ public UUID leaderId() {
+ return leaderId;
+ }
+
+ public long timeout() {
+ return timeout;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ea97a18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index 9f9bab3..b831ead 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -18,8 +18,98 @@
package org.apache.flink.runtime.rpc.taskexecutor;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
public class TaskExecutorTest extends TestLogger {
-
+
+ @Test
+ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
+ final ResourceID resourceID = ResourceID.generate();
+ final String resourceManagerAddress = "/resource/manager/address/one";
+
+ final TestingRpcService rpc = new TestingRpcService();
+ try {
+ // register a mock resource manager gateway
+ ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+ rpc.registerGateway(resourceManagerAddress, rmGateway);
+
+ NonHaServices haServices = new NonHaServices(resourceManagerAddress);
+ TaskExecutor taskManager = new TaskExecutor(rpc, haServices, resourceID);
+ String taskManagerAddress = taskManager.getAddress();
+
+ taskManager.start();
+
+ verify(rmGateway, timeout(5000)).registerTaskExecutor(
+ any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
+
+ @Test
+ public void testTriggerRegistrationOnLeaderChange() throws Exception {
+ final ResourceID resourceID = ResourceID.generate();
+
+ final String address1 = "/resource/manager/address/one";
+ final String address2 = "/resource/manager/address/two";
+ final UUID leaderId1 = UUID.randomUUID();
+ final UUID leaderId2 = UUID.randomUUID();
+
+ final TestingRpcService rpc = new TestingRpcService();
+ try {
+ // register the mock resource manager gateways
+ ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
+ ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
+ rpc.registerGateway(address1, rmGateway1);
+ rpc.registerGateway(address2, rmGateway2);
+
+ TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+
+ TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+ TaskExecutor taskManager = new TaskExecutor(rpc, haServices, resourceID);
+ String taskManagerAddress = taskManager.getAddress();
+ taskManager.start();
+
+ // no connection initially, since there is no leader
+ assertNull(taskManager.getResourceManagerConnection());
+
+ // define a leader and see that a registration happens
+ testLeaderService.notifyListener(address1, leaderId1);
+
+ verify(rmGateway1, timeout(5000)).registerTaskExecutor(
+ eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ assertNotNull(taskManager.getResourceManagerConnection());
+
+ // cancel the leader
+ testLeaderService.notifyListener(null, null);
+
+ // set a new leader, see that a registration happens
+ testLeaderService.notifyListener(address2, leaderId2);
+
+ verify(rmGateway2, timeout(5000)).registerTaskExecutor(
+ eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ assertNotNull(taskManager.getResourceManagerConnection());
+ }
+ finally {
+ rpc.stopService();
+ }
+ }
}