You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:21:51 UTC

[11/50] [abbrv] flink git commit: [hotfix] [rpc] Add RpcConnectionTest to validate that connection buildup fails fast when endpoint is unreachable.

[hotfix] [rpc] Add RpcConnectionTest to validate that connection buildup fails fast when endpoint is unreachable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aface1f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aface1f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aface1f3

Branch: refs/heads/flip-6
Commit: aface1f30562b608f4290f9be52fc485c749e0f1
Parents: 564f9a2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 21 13:03:17 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:46:22 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/AsyncCallsTest.java       |  4 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 14 ++--
 .../flink/runtime/rpc/RpcConnectionTest.java    | 86 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aface1f3/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e8255d4..7affdb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -43,9 +43,9 @@ public class AsyncCallsTest extends TestLogger {
 	//  shared test members
 	// ------------------------------------------------------------------------
 
-	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+	private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 
-	private static AkkaRpcService akkaRpcService =
+	private static final AkkaRpcService akkaRpcService =
 			new AkkaRpcService(actorSystem, Time.milliseconds(10000L));
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/aface1f3/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index ee3f784..53355e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -30,6 +30,7 @@ import org.reflections.Reflections;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -69,7 +70,8 @@ public class RpcCompletenessTest extends TestLogger {
 
 	@SuppressWarnings("rawtypes")
 	private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
-		Method[] gatewayMethods = getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]);
+		List<Method> rpcMethodsFromGateway = getRpcMethodsFromGateway(rpcGateway);
+		Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new Method[rpcMethodsFromGateway.size()]);
 		Method[] serverMethods = rpcEndpoint.getMethods();
 
 		Map<String, Set<Method>> rpcMethods = new HashMap<>();
@@ -360,13 +362,13 @@ public class RpcCompletenessTest extends TestLogger {
 		}
 
 		// Get all methods declared in current interface
-		for(Method method : interfaceClass.getDeclaredMethods()) {
-			allMethods.add(method);
-		}
+		Collections.addAll(allMethods, interfaceClass.getDeclaredMethods());
 
 		// Get all method inherited from super interface
-		for(Class superClass : interfaceClass.getInterfaces()) {
-			allMethods.addAll(getRpcMethodsFromGateway(superClass));
+		for (Class<?> superClass : interfaceClass.getInterfaces()) {
+			@SuppressWarnings("unchecked")
+			Class<? extends RpcGateway> gatewayClass = (Class<? extends RpcGateway>) superClass;
+			allMethods.addAll(getRpcMethodsFromGateway(gatewayClass));
 		}
 		return allMethods;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aface1f3/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
new file mode 100644
index 0000000..6363662
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test validates that the RPC service gives a good message when it cannot
+ * connect to an RpcEndpoint.
+ */
+public class RpcConnectionTest {
+
+	@Test
+	public void testConnectFailure() {
+		ActorSystem actorSystem = null;
+		RpcService rpcService = null;
+		try {
+			actorSystem = AkkaUtils.createActorSystem(
+					new Configuration(), Option.apply(new Tuple2<String, Object>("localhost", 0)));
+
+			// we start the RPC service with a very long timeout to ensure that the test
+			// can only pass if the connection problem is not recognized merely via a timeout
+			rpcService = new AkkaRpcService(actorSystem, new Timeout(10000000, TimeUnit.SECONDS));
+
+			Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
+
+			Await.result(future, new FiniteDuration(10000000, TimeUnit.SECONDS));
+			fail("should never complete normally");
+		}
+		catch (TimeoutException e) {
+			fail("should not fail with a generic timeout exception");
+		}
+		catch (RpcConnectionException e) {
+			// that is what we want
+			assertTrue("wrong error message", e.getMessage().contains("foo.bar.com.test.invalid"));
+		}
+		catch (Throwable t) {
+			fail("wrong exception: " + t);
+		}
+		finally {
+			if (rpcService != null) {
+				rpcService.stopService();
+			}
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+		}
+	}
+}