You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 12:25:16 UTC

[3/3] flink git commit: [FLINK-9365] [rpc] Add handshake procedure to AkkaRpcService when connecting

[FLINK-9365] [rpc] Add handshake procedure to AkkaRpcService when connecting

The handshake procedure sends the source version and the target rpc gateway type
to the rpc endpoint. This information is used to validate whether the version is
compatible and whether the rpc endpoint supports the target gateway type.

This closes #6017.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f07f36fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f07f36fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f07f36fc

Branch: refs/heads/release-1.5
Commit: f07f36fcf754eb675de329e61cfe49c109e70571
Parents: f7bb8f5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue May 15 10:55:04 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 14:24:37 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  74 +++++++--
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  56 ++++---
 .../runtime/rpc/akka/FencedAkkaRpcActor.java    |  10 +-
 .../akka/exceptions/AkkaHandshakeException.java |  40 +++++
 .../rpc/exceptions/HandshakeException.java      |  38 +++++
 .../rpc/messages/HandshakeSuccessMessage.java   |  26 ++++
 .../rpc/messages/RemoteHandshakeMessage.java    |  53 +++++++
 .../rpc/akka/AkkaRpcActorHandshakeTest.java     | 152 +++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcActorTest.java      |  38 +----
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   4 +
 10 files changed, 415 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 022dea3..8471d7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -22,12 +22,15 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaHandshakeException;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
 import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.rpc.messages.CallAsync;
+import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
 import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.ExceptionUtils;
@@ -80,10 +83,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 	private final CompletableFuture<Boolean> terminationFuture;
 
-	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture) {
+	private final int version;
+
+	private State state;
+
+	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, final int version) {
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
 		this.terminationFuture = checkNotNull(terminationFuture);
+		this.version = version;
+		this.state = State.STOPPED;
 	}
 
 	@Override
@@ -120,21 +129,20 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 	@Override
 	public void onReceive(final Object message) {
-		if (message.equals(Processing.START)) {
-			getContext().become(
-				(Object msg) -> {
-					if (msg.equals(Processing.STOP)) {
-						getContext().unbecome();
-					} else {
-						mainThreadValidator.enterMainThread();
+		if (message instanceof RemoteHandshakeMessage) {
+			handleHandshakeMessage((RemoteHandshakeMessage) message);
+		} else if (message.equals(Processing.START)) {
+			state = State.STARTED;
+		} else if (message.equals(Processing.STOP)) {
+			state = State.STOPPED;
+		} else if (state == State.STARTED) {
+			mainThreadValidator.enterMainThread();
 
-						try {
-							handleMessage(msg);
-						} finally {
-							mainThreadValidator.exitMainThread();
-						}
-					}
-				});
+			try {
+				handleRpcMessage(message);
+			} finally {
+				mainThreadValidator.exitMainThread();
+			}
 		} else {
 			log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
 				rpcEndpoint.getClass().getName(),
@@ -145,7 +153,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
-	protected void handleMessage(Object message) {
+	protected void handleRpcMessage(Object message) {
 		if (message instanceof RunAsync) {
 			handleRunAsync((RunAsync) message);
 		} else if (message instanceof CallAsync) {
@@ -163,6 +171,35 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
+	private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
+		if (!isCompatibleVersion(handshakeMessage.getVersion())) {
+			sendErrorIfSender(new AkkaHandshakeException(
+				String.format(
+					"Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",
+					handshakeMessage.getVersion(),
+					getVersion())));
+		} else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
+			sendErrorIfSender(new AkkaHandshakeException(
+				String.format(
+					"The rpc endpoint does not support the gateway %s.",
+					handshakeMessage.getRpcGateway().getSimpleName())));
+		} else {
+			getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
+		}
+	}
+
+	private boolean isGatewaySupported(Class<?> rpcGateway) {
+		return rpcGateway.isAssignableFrom(rpcEndpoint.getClass());
+	}
+
+	private boolean isCompatibleVersion(int sourceVersion) {
+		return sourceVersion == getVersion();
+	}
+
+	private int getVersion() {
+		return version;
+	}
+
 	/**
 	 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
 	 * method with the provided method arguments. If the method has a return value, it is returned
@@ -344,4 +381,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 	protected Object envelopeSelfMessage(Object message) {
 		return message;
 	}
+
+	enum State {
+		STARTED,
+		STOPPED
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 1151389..4ad6541 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
+import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
 
 import akka.actor.ActorIdentity;
 import akka.actor.ActorRef;
@@ -83,6 +85,8 @@ public class AkkaRpcService implements RpcService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
 
+	static final int VERSION = 1;
+
 	static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
 
 	private final Object lock = new Object();
@@ -140,6 +144,10 @@ public class AkkaRpcService implements RpcService {
 		return actorSystem;
 	}
 
+	protected int getVersion() {
+		return VERSION;
+	}
+
 	@Override
 	public String getAddress() {
 		return address;
@@ -200,9 +208,9 @@ public class AkkaRpcService implements RpcService {
 		final Props akkaRpcActorProps;
 
 		if (rpcEndpoint instanceof FencedRpcEndpoint) {
-			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture);
+			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
 		} else {
-			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
+			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
 		}
 
 		ActorRef actorRef;
@@ -418,28 +426,38 @@ public class AkkaRpcService implements RpcService {
 
 		final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
 
-		return identifyFuture.thenApplyAsync(
+		final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply(
 			(ActorIdentity actorIdentity) -> {
 				if (actorIdentity.getRef() == null) {
 					throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'));
 				} else {
-					ActorRef actorRef = actorIdentity.getRef();
-
-					InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
-
-					// Rather than using the System ClassLoader directly, we derive the ClassLoader
-					// from this class . That works better in cases where Flink runs embedded and all Flink
-					// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
-					ClassLoader classLoader = getClass().getClassLoader();
-
-					@SuppressWarnings("unchecked")
-					C proxy = (C) Proxy.newProxyInstance(
-						classLoader,
-						new Class<?>[]{clazz},
-						invocationHandler);
-
-					return proxy;
+					return actorIdentity.getRef();
 				}
+			});
+
+		final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
+			(ActorRef actorRef) -> FutureUtils.toJava(
+				Patterns
+					.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
+					.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
+
+		return actorRefFuture.thenCombineAsync(
+			handshakeFuture,
+			(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
+				InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
+
+				// Rather than using the System ClassLoader directly, we derive the ClassLoader
+				// from this class . That works better in cases where Flink runs embedded and all Flink
+				// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
+				ClassLoader classLoader = getClass().getClassLoader();
+
+				@SuppressWarnings("unchecked")
+				C proxy = (C) Proxy.newProxyInstance(
+					classLoader,
+					new Class<?>[]{clazz},
+					invocationHandler);
+
+				return proxy;
 			},
 			actorSystem.dispatcher());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index d4cc16e..872effd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -39,12 +39,12 @@ import java.util.concurrent.CompletableFuture;
  */
 public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
 
-	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture) {
-		super(rpcEndpoint, terminationFuture);
+	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) {
+		super(rpcEndpoint, terminationFuture, version);
 	}
 
 	@Override
-	protected void handleMessage(Object message) {
+	protected void handleRpcMessage(Object message) {
 		if (message instanceof FencedMessage) {
 
 			final F expectedFencingToken = rpcEndpoint.getFencingToken();
@@ -67,7 +67,7 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
 				F fencingToken = fencedMessage.getFencingToken();
 
 				if (Objects.equals(expectedFencingToken, fencingToken)) {
-					super.handleMessage(fencedMessage.getPayload());
+					super.handleRpcMessage(fencedMessage.getPayload());
 				} else {
 					if (log.isDebugEnabled()) {
 						log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " +
@@ -81,7 +81,7 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
 				}
 			}
 		} else if (message instanceof UnfencedMessage) {
-			super.handleMessage(((UnfencedMessage<?>) message).getPayload());
+			super.handleRpcMessage(((UnfencedMessage<?>) message).getPayload());
 		} else {
 			if (log.isDebugEnabled()) {
 				log.debug("Unknown message type: Ignoring message {} because it is neither of type {} nor {}.",

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java
new file mode 100644
index 0000000..ebb8530
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.exceptions;
+
+import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
+
+/**
+ * Exception which is thrown if the handshake fails.
+ */
+public class AkkaHandshakeException extends HandshakeException {
+	private static final long serialVersionUID = 7690464691855200936L;
+
+	public AkkaHandshakeException(String message) {
+		super(message);
+	}
+
+	public AkkaHandshakeException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public AkkaHandshakeException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java
new file mode 100644
index 0000000..0b434d6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+/**
+ * Exception which signals a handshake failure.
+ */
+public class HandshakeException extends RpcConnectionException {
+	private static final long serialVersionUID = -8176772204831111193L;
+
+	public HandshakeException(String message) {
+		super(message);
+	}
+
+	public HandshakeException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public HandshakeException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java
new file mode 100644
index 0000000..f05fce8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+/**
+ * Handshake success response.
+ */
+public enum HandshakeSuccessMessage {
+	INSTANCE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java
new file mode 100644
index 0000000..026822f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/**
+ * Handshake message between rpc endpoints. This message can be used
+ * to verify compatibility between different endpoints.
+ */
+public class RemoteHandshakeMessage implements Serializable {
+
+	private static final long serialVersionUID = -7150082246232019027L;
+
+	@Nonnull
+	private final Class<?> rpcGateway;
+
+	@Nonnull
+	private final int version;
+
+	public RemoteHandshakeMessage(@Nonnull Class<?> rpcGateway, @Nonnull int version) {
+		this.rpcGateway = rpcGateway;
+		this.version = version;
+	}
+
+	@Nonnull
+	public Class<?> getRpcGateway() {
+		return rpcGateway;
+	}
+
+	@Nonnull
+	public int getVersion() {
+		return version;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
new file mode 100644
index 0000000..ed7a3bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the handshake between rpc endpoints.
+ */
+public class AkkaRpcActorHandshakeTest extends TestLogger {
+
+	private static final Time timeout = Time.seconds(10L);
+
+	private static AkkaRpcService akkaRpcService1;
+	private static AkkaRpcService akkaRpcService2;
+	private static WrongVersionAkkaRpcService wrongVersionAkkaRpcService;
+
+	@BeforeClass
+	public static void setupClass() {
+		final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		final ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem();
+
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
+		wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(wrongVersionActorSystem, timeout);
+	}
+
+	@AfterClass
+	public static void teardownClass() throws Exception {
+		final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(3);
+
+		terminationFutures.add(akkaRpcService1.stopService());
+		terminationFutures.add(akkaRpcService2.stopService());
+		terminationFutures.add(wrongVersionAkkaRpcService.stopService());
+
+		FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
+	@Test
+	public void testVersionMatchBetweenRpcComponents() throws Exception {
+		AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
+		final int value = 42;
+		rpcEndpoint.setFoobar(value);
+
+		rpcEndpoint.start();
+
+		try {
+			final AkkaRpcActorTest.DummyRpcGateway dummyRpcGateway = akkaRpcService2.connect(rpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get();
+
+			assertThat(dummyRpcGateway.foobar().get(), equalTo(value));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+		}
+	}
+
+	@Test
+	public void testVersionMismatchBetweenRpcComponents() throws Exception {
+		AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
+
+		rpcEndpoint.start();
+
+		try {
+			try {
+				wrongVersionAkkaRpcService.connect(rpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get();
+				fail("Expected HandshakeException.");
+			} catch (ExecutionException ee) {
+				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(HandshakeException.class));
+			}
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+		}
+	}
+
+	/**
+	 * Tests that we receive a HandshakeException when connecting to a rpc endpoint which
+	 * does not support the requested rpc gateway.
+	 */
+	@Test
+	public void testWrongGatewayEndpointConnection() throws Exception {
+		AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
+
+		rpcEndpoint.start();
+
+		CompletableFuture<WrongRpcGateway> futureGateway = akkaRpcService2.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
+
+		try {
+			futureGateway.get(timeout.getSize(), timeout.getUnit());
+			fail("We expected a HandshakeException.");
+		} catch (ExecutionException executionException) {
+			assertThat(ExceptionUtils.stripExecutionException(executionException), instanceOf(HandshakeException.class));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+		}
+	}
+
+	private static class WrongVersionAkkaRpcService extends AkkaRpcService {
+
+		WrongVersionAkkaRpcService(ActorSystem actorSystem, Time timeout) {
+			super(actorSystem, timeout);
+		}
+
+		@Override
+		protected int getVersion() {
+			return -1;
+		}
+	}
+
+	private interface WrongRpcGateway extends RpcGateway {
+		CompletableFuture<Boolean> barfoo();
+		void tell(String message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a92235c..f4e0156 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -146,35 +146,6 @@ public class AkkaRpcActorTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that we receive a RpcConnectionException when calling a rpc method (with return type)
-	 * on a wrong rpc endpoint.
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testWrongGatewayEndpointConnection() throws Exception {
-		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
-
-		rpcEndpoint.start();
-
-		CompletableFuture<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
-
-		WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit());
-
-		// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
-		gateway.tell("foobar");
-
-		CompletableFuture<Boolean> result = gateway.barfoo();
-
-		try {
-			result.get(timeout.getSize(), timeout.getUnit());
-			fail("We expected a RpcConnectionException.");
-		} catch (ExecutionException executionException) {
-			assertTrue(executionException.getCause() instanceof RpcConnectionException);
-		}
-	}
-
-	/**
 	 * Tests that we can wait for a RpcEndpoint to terminate.
 	 *
 	 * @throws ExecutionException
@@ -327,15 +298,10 @@ public class AkkaRpcActorTest extends TestLogger {
 	//  Test Actors and Interfaces
 	// ------------------------------------------------------------------------
 
-	private interface DummyRpcGateway extends RpcGateway {
+	interface DummyRpcGateway extends RpcGateway {
 		CompletableFuture<Integer> foobar();
 	}
 
-	private interface WrongRpcGateway extends RpcGateway {
-		CompletableFuture<Boolean> barfoo();
-		void tell(String message);
-	}
-
 	private static class TestRpcEndpoint extends RpcEndpoint {
 
 		protected TestRpcEndpoint(RpcService rpcService) {
@@ -348,7 +314,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
-	private static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
+	static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
 
 		private volatile int _foobar = 42;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f07f36fc/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index d73ee40..8027c18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -312,4 +312,8 @@ public class AkkaRpcServiceTest extends TestLogger {
 			// expected
 		}
 	}
+
+	@Test
+	public void testVersionIncompatibility() {
+	}
 }