You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:21:51 UTC
[11/50] [abbrv] flink git commit: [hotfix] [rpc] Add
RpcConnectionTest to validate that connection buildup fails fast when
endpoint is unreachable.
[hotfix] [rpc] Add RpcConnectionTest to validate that connection buildup fails fast when endpoint is unreachable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aface1f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aface1f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aface1f3
Branch: refs/heads/flip-6
Commit: aface1f30562b608f4290f9be52fc485c749e0f1
Parents: 564f9a2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 21 13:03:17 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:46:22 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/rpc/AsyncCallsTest.java | 4 +-
.../flink/runtime/rpc/RpcCompletenessTest.java | 14 ++--
.../flink/runtime/rpc/RpcConnectionTest.java | 86 ++++++++++++++++++++
3 files changed, 96 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aface1f3/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e8255d4..7affdb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -43,9 +43,9 @@ public class AsyncCallsTest extends TestLogger {
// shared test members
// ------------------------------------------------------------------------
- private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+ private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
- private static AkkaRpcService akkaRpcService =
+ private static final AkkaRpcService akkaRpcService =
new AkkaRpcService(actorSystem, Time.milliseconds(10000L));
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/aface1f3/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index ee3f784..53355e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -30,6 +30,7 @@ import org.reflections.Reflections;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -69,7 +70,8 @@ public class RpcCompletenessTest extends TestLogger {
@SuppressWarnings("rawtypes")
private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
- Method[] gatewayMethods = getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]);
+ List<Method> rpcMethodsFromGateway = getRpcMethodsFromGateway(rpcGateway);
+ Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new Method[rpcMethodsFromGateway.size()]);
Method[] serverMethods = rpcEndpoint.getMethods();
Map<String, Set<Method>> rpcMethods = new HashMap<>();
@@ -360,13 +362,13 @@ public class RpcCompletenessTest extends TestLogger {
}
// Get all methods declared in current interface
- for(Method method : interfaceClass.getDeclaredMethods()) {
- allMethods.add(method);
- }
+ Collections.addAll(allMethods, interfaceClass.getDeclaredMethods());
// Get all method inherited from super interface
- for(Class superClass : interfaceClass.getInterfaces()) {
- allMethods.addAll(getRpcMethodsFromGateway(superClass));
+ for (Class<?> superClass : interfaceClass.getInterfaces()) {
+ @SuppressWarnings("unchecked")
+ Class<? extends RpcGateway> gatewayClass = (Class<? extends RpcGateway>) superClass;
+ allMethods.addAll(getRpcMethodsFromGateway(gatewayClass));
}
return allMethods;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aface1f3/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
new file mode 100644
index 0000000..6363662
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test validates that the RPC service gives a good message when it cannot
+ * connect to an RpcEndpoint.
+ */
+public class RpcConnectionTest {
+
+ @Test
+ public void testConnectFailure() {
+ ActorSystem actorSystem = null;
+ RpcService rpcService = null;
+ try {
+ actorSystem = AkkaUtils.createActorSystem(
+ new Configuration(), Option.apply(new Tuple2<String, Object>("localhost", 0)));
+
+ // we start the RPC service with a very long timeout to ensure that the test
+ // can only pass if the connection problem is not recognized merely via a timeout
+ rpcService = new AkkaRpcService(actorSystem, new Timeout(10000000, TimeUnit.SECONDS));
+
+ Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
+
+ Await.result(future, new FiniteDuration(10000000, TimeUnit.SECONDS));
+ fail("should never complete normally");
+ }
+ catch (TimeoutException e) {
+ fail("should not fail with a generic timeout exception");
+ }
+ catch (RpcConnectionException e) {
+ // that is what we want
+ assertTrue("wrong error message", e.getMessage().contains("foo.bar.com.test.invalid"));
+ }
+ catch (Throwable t) {
+ fail("wrong exception: " + t);
+ }
+ finally {
+ if (rpcService != null) {
+ rpcService.stopService();
+ }
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+ }
+ }
+}