You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:01 UTC
[19/50] [abbrv] flink git commit: [FLINK-4451] [rpc] Throw
RpcConnectionException when rpc endpoint is not reachable
[FLINK-4451] [rpc] Throw RpcConnectionException when rpc endpoint is not reachable
This PR introduces a RpcConnectionException which is thrown if the rpc endpoint
is not reachable when calling RpcService.connect.
This closes #2405.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be561f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be561f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be561f5
Branch: refs/heads/flip-6
Commit: 3be561f57dab448536e41636997506f5f12aea18
Parents: 6f9936b
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 23 17:59:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200
----------------------------------------------------------------------
.../registration/RetryingRegistration.java | 2 +-
.../apache/flink/runtime/rpc/RpcService.java | 7 +++-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 38 +++++++++++-------
.../rpc/exceptions/RpcConnectionException.java | 41 ++++++++++++++++++++
.../runtime/rpc/akka/AkkaRpcActorTest.java | 18 +++++++++
5 files changed, 88 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 88fe9b5..ea49e42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -197,7 +197,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
@Override
public void onFailure(Throwable failure) {
if (!isCanceled()) {
- log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress);
+ log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure);
startRegistration();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index bc0f5cb..78c1cec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
@@ -32,12 +33,14 @@ public interface RpcService {
/**
* Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
- * be used to communicate with the rpc server.
+ * be used to communicate with the rpc server. If the connection failed, then the returned
+ * future is failed with a {@link RpcConnectionException}.
*
* @param address Address of the remote rpc server
* @param clazz Class of the rpc gateway to return
* @param <C> Type of the rpc gateway to return
- * @return Future containing the rpc gateway
+ * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the
+ * connection attempt failed
*/
<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 00a6932..060a1ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,25 +100,32 @@ public class AkkaRpcService implements RpcService {
final Future<Object> identify = asker.ask(new Identify(42), timeout);
return identify.map(new Mapper<Object, C>(){
@Override
- public C apply(Object obj) {
- ActorRef actorRef = ((ActorIdentity) obj).getRef();
+ public C checkedApply(Object obj) throws Exception {
- final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+ ActorIdentity actorIdentity = (ActorIdentity) obj;
- InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
+ if (actorIdentity.getRef() == null) {
+ throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.');
+ } else {
+ ActorRef actorRef = actorIdentity.getRef();
+
+ final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+ InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
- // Rather than using the System ClassLoader directly, we derive the ClassLoader
- // from this class . That works better in cases where Flink runs embedded and all Flink
- // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
- ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
-
- @SuppressWarnings("unchecked")
- C proxy = (C) Proxy.newProxyInstance(
- classLoader,
- new Class<?>[] {clazz},
- akkaInvocationHandler);
+ // Rather than using the System ClassLoader directly, we derive the ClassLoader
+ // from this class . That works better in cases where Flink runs embedded and all Flink
+ // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
+ ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
- return proxy;
+ @SuppressWarnings("unchecked")
+ C proxy = (C) Proxy.newProxyInstance(
+ classLoader,
+ new Class<?>[]{clazz},
+ akkaInvocationHandler);
+
+ return proxy;
+ }
}
}, actorSystem.dispatcher());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
new file mode 100644
index 0000000..a22ebe7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Exception class which is thrown if a rpc connection failed. Usually this happens if the remote
+ * host cannot be reached.
+ */
+public class RpcConnectionException extends ExecutionException {
+ private static final long serialVersionUID = -5500560405481142472L;
+
+ public RpcConnectionException(String message) {
+ super(message);
+ }
+
+ public RpcConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RpcConnectionException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 82d13f0..a6ceb91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
@@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
public class AkkaRpcActorTest extends TestLogger {
@@ -73,6 +75,22 @@ public class AkkaRpcActorTest extends TestLogger {
}
/**
+ * Tests that a {@link RpcConnectionException} is thrown if the rpc endpoint cannot be connected to.
+ */
+ @Test
+ public void testFailingAddressResolution() throws Exception {
+ Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
+
+ try {
+ DummyRpcGateway gateway = Await.result(futureRpcGateway, timeout.duration());
+
+ fail("The rpc connection resolution should have failed.");
+ } catch (RpcConnectionException exception) {
+ // we're expecting a RpcConnectionException
+ }
+ }
+
+ /**
* Tests that the {@link AkkaRpcActor} stashes messages until the corresponding
* {@link RpcEndpoint} has been started.
*/