You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:01 UTC

[19/50] [abbrv] flink git commit: [FLINK-4451] [rpc] Throw RpcConnectionException when rpc endpoint is not reachable

[FLINK-4451] [rpc] Throw RpcConnectionException when rpc endpoint is not reachable

This PR introduces a RpcConnectionException which is thrown if the rpc endpoint
is not reachable when calling RpcService.connect.

This closes #2405.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be561f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be561f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be561f5

Branch: refs/heads/flip-6
Commit: 3be561f57dab448536e41636997506f5f12aea18
Parents: 6f9936b
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 23 17:59:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200

----------------------------------------------------------------------
 .../registration/RetryingRegistration.java      |  2 +-
 .../apache/flink/runtime/rpc/RpcService.java    |  7 +++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 38 +++++++++++-------
 .../rpc/exceptions/RpcConnectionException.java  | 41 ++++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 18 +++++++++
 5 files changed, 88 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 88fe9b5..ea49e42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -197,7 +197,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 				@Override
 				public void onFailure(Throwable failure) {
 					if (!isCanceled()) {
-						log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress);
+						log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure);
 						startRegistration();
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index bc0f5cb..78c1cec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
@@ -32,12 +33,14 @@ public interface RpcService {
 
 	/**
 	 * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
-	 * be used to communicate with the rpc server.
+	 * be used to communicate with the rpc server. If the connection failed, then the returned
+	 * future is failed with a {@link RpcConnectionException}.
 	 *
 	 * @param address Address of the remote rpc server
 	 * @param clazz Class of the rpc gateway to return
 	 * @param <C> Type of the rpc gateway to return
-	 * @return Future containing the rpc gateway
+	 * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the
+	 * connection attempt failed
 	 */
 	<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 00a6932..060a1ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,25 +100,32 @@ public class AkkaRpcService implements RpcService {
 		final Future<Object> identify = asker.ask(new Identify(42), timeout);
 		return identify.map(new Mapper<Object, C>(){
 			@Override
-			public C apply(Object obj) {
-				ActorRef actorRef = ((ActorIdentity) obj).getRef();
+			public C checkedApply(Object obj) throws Exception {
 
-				final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+				ActorIdentity actorIdentity = (ActorIdentity) obj;
 
-				InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
+				if (actorIdentity.getRef() == null) {
+					throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.');
+				} else {
+					ActorRef actorRef = actorIdentity.getRef();
+
+					final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+					InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
 
-				// Rather than using the System ClassLoader directly, we derive the ClassLoader
-				// from this class . That works better in cases where Flink runs embedded and all Flink
-				// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
-				ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
-				
-				@SuppressWarnings("unchecked")
-				C proxy = (C) Proxy.newProxyInstance(
-					classLoader,
-					new Class<?>[] {clazz},
-					akkaInvocationHandler);
+					// Rather than using the System ClassLoader directly, we derive the ClassLoader
+					// from this class . That works better in cases where Flink runs embedded and all Flink
+					// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
+					ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
 
-				return proxy;
+					@SuppressWarnings("unchecked")
+					C proxy = (C) Proxy.newProxyInstance(
+						classLoader,
+						new Class<?>[]{clazz},
+						akkaInvocationHandler);
+
+					return proxy;
+				}
 			}
 		}, actorSystem.dispatcher());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
new file mode 100644
index 0000000..a22ebe7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Exception class which is thrown if a rpc connection failed. Usually this happens if the remote
+ * host cannot be reached.
+ */
+public class RpcConnectionException extends ExecutionException {
+	private static final long serialVersionUID = -5500560405481142472L;
+
+	public RpcConnectionException(String message) {
+		super(message);
+	}
+
+	public RpcConnectionException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public RpcConnectionException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 82d13f0..a6ceb91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
@@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 public class AkkaRpcActorTest extends TestLogger {
 
@@ -73,6 +75,22 @@ public class AkkaRpcActorTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that a {@link RpcConnectionException} is thrown if the rpc endpoint cannot be connected to.
+	 */
+	@Test
+	public void testFailingAddressResolution() throws Exception {
+		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
+
+		try {
+			DummyRpcGateway gateway = Await.result(futureRpcGateway, timeout.duration());
+
+			fail("The rpc connection resolution should have failed.");
+		} catch (RpcConnectionException exception) {
+			// we're expecting a RpcConnectionException
+		}
+	}
+
+	/**
 	 * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding
 	 * {@link RpcEndpoint} has been started.
 	 */