You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 12:24:22 UTC

[1/3] flink git commit: [FLINK-9365] [rpc] Add handshake procedure to AkkaRpcService when connecting

Repository: flink
Updated Branches:
  refs/heads/master d5de2bcf1 -> 89ac3dbe4


[FLINK-9365] [rpc] Add handshake procedure to AkkaRpcService when connecting

The handshake procedure sends the source version and the target rpc gateway type
to the rpc endpoint. This information is used to validate whether the version is
compatible and whether the rpc endpoint supports the target gateway type.

This closes #6017.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89ac3dbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89ac3dbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89ac3dbe

Branch: refs/heads/master
Commit: 89ac3dbe4a66c94ae5af5f06dc92ea2dcd13da57
Parents: 5a093e5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue May 15 10:55:04 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 14:24:02 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  74 +++++++--
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  56 ++++---
 .../runtime/rpc/akka/FencedAkkaRpcActor.java    |  10 +-
 .../akka/exceptions/AkkaHandshakeException.java |  40 +++++
 .../rpc/exceptions/HandshakeException.java      |  38 +++++
 .../rpc/messages/HandshakeSuccessMessage.java   |  26 ++++
 .../rpc/messages/RemoteHandshakeMessage.java    |  53 +++++++
 .../rpc/akka/AkkaRpcActorHandshakeTest.java     | 152 +++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcActorTest.java      |  38 +----
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   4 +
 10 files changed, 415 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 022dea3..8471d7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -22,12 +22,15 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaHandshakeException;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
 import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.rpc.messages.CallAsync;
+import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
 import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.ExceptionUtils;
@@ -80,10 +83,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 	private final CompletableFuture<Boolean> terminationFuture;
 
-	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture) {
+	private final int version;
+
+	private State state;
+
+	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, final int version) {
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
 		this.terminationFuture = checkNotNull(terminationFuture);
+		this.version = version;
+		this.state = State.STOPPED;
 	}
 
 	@Override
@@ -120,21 +129,20 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 	@Override
 	public void onReceive(final Object message) {
-		if (message.equals(Processing.START)) {
-			getContext().become(
-				(Object msg) -> {
-					if (msg.equals(Processing.STOP)) {
-						getContext().unbecome();
-					} else {
-						mainThreadValidator.enterMainThread();
+		if (message instanceof RemoteHandshakeMessage) {
+			handleHandshakeMessage((RemoteHandshakeMessage) message);
+		} else if (message.equals(Processing.START)) {
+			state = State.STARTED;
+		} else if (message.equals(Processing.STOP)) {
+			state = State.STOPPED;
+		} else if (state == State.STARTED) {
+			mainThreadValidator.enterMainThread();
 
-						try {
-							handleMessage(msg);
-						} finally {
-							mainThreadValidator.exitMainThread();
-						}
-					}
-				});
+			try {
+				handleRpcMessage(message);
+			} finally {
+				mainThreadValidator.exitMainThread();
+			}
 		} else {
 			log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
 				rpcEndpoint.getClass().getName(),
@@ -145,7 +153,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
-	protected void handleMessage(Object message) {
+	protected void handleRpcMessage(Object message) {
 		if (message instanceof RunAsync) {
 			handleRunAsync((RunAsync) message);
 		} else if (message instanceof CallAsync) {
@@ -163,6 +171,35 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
+	private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
+		if (!isCompatibleVersion(handshakeMessage.getVersion())) {
+			sendErrorIfSender(new AkkaHandshakeException(
+				String.format(
+					"Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",
+					handshakeMessage.getVersion(),
+					getVersion())));
+		} else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
+			sendErrorIfSender(new AkkaHandshakeException(
+				String.format(
+					"The rpc endpoint does not support the gateway %s.",
+					handshakeMessage.getRpcGateway().getSimpleName())));
+		} else {
+			getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
+		}
+	}
+
+	private boolean isGatewaySupported(Class<?> rpcGateway) {
+		return rpcGateway.isAssignableFrom(rpcEndpoint.getClass());
+	}
+
+	private boolean isCompatibleVersion(int sourceVersion) {
+		return sourceVersion == getVersion();
+	}
+
+	private int getVersion() {
+		return version;
+	}
+
 	/**
 	 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
 	 * method with the provided method arguments. If the method has a return value, it is returned
@@ -344,4 +381,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 	protected Object envelopeSelfMessage(Object message) {
 		return message;
 	}
+
+	enum State {
+		STARTED,
+		STOPPED
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 1151389..4ad6541 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
+import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
 
 import akka.actor.ActorIdentity;
 import akka.actor.ActorRef;
@@ -83,6 +85,8 @@ public class AkkaRpcService implements RpcService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
 
+	static final int VERSION = 1;
+
 	static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
 
 	private final Object lock = new Object();
@@ -140,6 +144,10 @@ public class AkkaRpcService implements RpcService {
 		return actorSystem;
 	}
 
+	protected int getVersion() {
+		return VERSION;
+	}
+
 	@Override
 	public String getAddress() {
 		return address;
@@ -200,9 +208,9 @@ public class AkkaRpcService implements RpcService {
 		final Props akkaRpcActorProps;
 
 		if (rpcEndpoint instanceof FencedRpcEndpoint) {
-			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture);
+			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
 		} else {
-			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
+			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
 		}
 
 		ActorRef actorRef;
@@ -418,28 +426,38 @@ public class AkkaRpcService implements RpcService {
 
 		final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
 
-		return identifyFuture.thenApplyAsync(
+		final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply(
 			(ActorIdentity actorIdentity) -> {
 				if (actorIdentity.getRef() == null) {
 					throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'));
 				} else {
-					ActorRef actorRef = actorIdentity.getRef();
-
-					InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
-
-					// Rather than using the System ClassLoader directly, we derive the ClassLoader
-					// from this class . That works better in cases where Flink runs embedded and all Flink
-					// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
-					ClassLoader classLoader = getClass().getClassLoader();
-
-					@SuppressWarnings("unchecked")
-					C proxy = (C) Proxy.newProxyInstance(
-						classLoader,
-						new Class<?>[]{clazz},
-						invocationHandler);
-
-					return proxy;
+					return actorIdentity.getRef();
 				}
+			});
+
+		final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
+			(ActorRef actorRef) -> FutureUtils.toJava(
+				Patterns
+					.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
+					.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
+
+		return actorRefFuture.thenCombineAsync(
+			handshakeFuture,
+			(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
+				InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
+
+				// Rather than using the System ClassLoader directly, we derive the ClassLoader
+				// from this class . That works better in cases where Flink runs embedded and all Flink
+				// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
+				ClassLoader classLoader = getClass().getClassLoader();
+
+				@SuppressWarnings("unchecked")
+				C proxy = (C) Proxy.newProxyInstance(
+					classLoader,
+					new Class<?>[]{clazz},
+					invocationHandler);
+
+				return proxy;
 			},
 			actorSystem.dispatcher());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index d4cc16e..872effd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -39,12 +39,12 @@ import java.util.concurrent.CompletableFuture;
  */
 public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
 
-	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture) {
-		super(rpcEndpoint, terminationFuture);
+	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) {
+		super(rpcEndpoint, terminationFuture, version);
 	}
 
 	@Override
-	protected void handleMessage(Object message) {
+	protected void handleRpcMessage(Object message) {
 		if (message instanceof FencedMessage) {
 
 			final F expectedFencingToken = rpcEndpoint.getFencingToken();
@@ -67,7 +67,7 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
 				F fencingToken = fencedMessage.getFencingToken();
 
 				if (Objects.equals(expectedFencingToken, fencingToken)) {
-					super.handleMessage(fencedMessage.getPayload());
+					super.handleRpcMessage(fencedMessage.getPayload());
 				} else {
 					if (log.isDebugEnabled()) {
 						log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " +
@@ -81,7 +81,7 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
 				}
 			}
 		} else if (message instanceof UnfencedMessage) {
-			super.handleMessage(((UnfencedMessage<?>) message).getPayload());
+			super.handleRpcMessage(((UnfencedMessage<?>) message).getPayload());
 		} else {
 			if (log.isDebugEnabled()) {
 				log.debug("Unknown message type: Ignoring message {} because it is neither of type {} nor {}.",

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java
new file mode 100644
index 0000000..ebb8530
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaHandshakeException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.exceptions;
+
+import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
+
+/**
+ * Exception which is thrown if the handshake fails.
+ */
+public class AkkaHandshakeException extends HandshakeException {
+	private static final long serialVersionUID = 7690464691855200936L;
+
+	public AkkaHandshakeException(String message) {
+		super(message);
+	}
+
+	public AkkaHandshakeException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public AkkaHandshakeException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java
new file mode 100644
index 0000000..0b434d6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/HandshakeException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+/**
+ * Exception which signals a handshake failure.
+ */
+public class HandshakeException extends RpcConnectionException {
+	private static final long serialVersionUID = -8176772204831111193L;
+
+	public HandshakeException(String message) {
+		super(message);
+	}
+
+	public HandshakeException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public HandshakeException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java
new file mode 100644
index 0000000..f05fce8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/HandshakeSuccessMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+/**
+ * Handshake success response.
+ */
+public enum HandshakeSuccessMessage {
+	INSTANCE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java
new file mode 100644
index 0000000..026822f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteHandshakeMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/**
+ * Handshake message between rpc endpoints. This message can be used
+ * to verify compatibility between different endpoints.
+ */
+public class RemoteHandshakeMessage implements Serializable {
+
+	private static final long serialVersionUID = -7150082246232019027L;
+
+	@Nonnull
+	private final Class<?> rpcGateway;
+
+	@Nonnull
+	private final int version;
+
+	public RemoteHandshakeMessage(@Nonnull Class<?> rpcGateway, @Nonnull int version) {
+		this.rpcGateway = rpcGateway;
+		this.version = version;
+	}
+
+	@Nonnull
+	public Class<?> getRpcGateway() {
+		return rpcGateway;
+	}
+
+	@Nonnull
+	public int getVersion() {
+		return version;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
new file mode 100644
index 0000000..ed7a3bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the handshake between rpc endpoints.
+ */
+public class AkkaRpcActorHandshakeTest extends TestLogger {
+
+	private static final Time timeout = Time.seconds(10L);
+
+	private static AkkaRpcService akkaRpcService1;
+	private static AkkaRpcService akkaRpcService2;
+	private static WrongVersionAkkaRpcService wrongVersionAkkaRpcService;
+
+	@BeforeClass
+	public static void setupClass() {
+		final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		final ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem();
+
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
+		wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(wrongVersionActorSystem, timeout);
+	}
+
+	@AfterClass
+	public static void teardownClass() throws Exception {
+		final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(3);
+
+		terminationFutures.add(akkaRpcService1.stopService());
+		terminationFutures.add(akkaRpcService2.stopService());
+		terminationFutures.add(wrongVersionAkkaRpcService.stopService());
+
+		FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
+	@Test
+	public void testVersionMatchBetweenRpcComponents() throws Exception {
+		AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
+		final int value = 42;
+		rpcEndpoint.setFoobar(value);
+
+		rpcEndpoint.start();
+
+		try {
+			final AkkaRpcActorTest.DummyRpcGateway dummyRpcGateway = akkaRpcService2.connect(rpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get();
+
+			assertThat(dummyRpcGateway.foobar().get(), equalTo(value));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+		}
+	}
+
+	@Test
+	public void testVersionMismatchBetweenRpcComponents() throws Exception {
+		AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
+
+		rpcEndpoint.start();
+
+		try {
+			try {
+				wrongVersionAkkaRpcService.connect(rpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get();
+				fail("Expected HandshakeException.");
+			} catch (ExecutionException ee) {
+				assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(HandshakeException.class));
+			}
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+		}
+	}
+
+	/**
+	 * Tests that we receive a HandshakeException when connecting to a rpc endpoint which
+	 * does not support the requested rpc gateway.
+	 */
+	@Test
+	public void testWrongGatewayEndpointConnection() throws Exception {
+		AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
+
+		rpcEndpoint.start();
+
+		CompletableFuture<WrongRpcGateway> futureGateway = akkaRpcService2.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
+
+		try {
+			futureGateway.get(timeout.getSize(), timeout.getUnit());
+			fail("We expected a HandshakeException.");
+		} catch (ExecutionException executionException) {
+			assertThat(ExceptionUtils.stripExecutionException(executionException), instanceOf(HandshakeException.class));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+		}
+	}
+
+	private static class WrongVersionAkkaRpcService extends AkkaRpcService {
+
+		WrongVersionAkkaRpcService(ActorSystem actorSystem, Time timeout) {
+			super(actorSystem, timeout);
+		}
+
+		@Override
+		protected int getVersion() {
+			return -1;
+		}
+	}
+
+	private interface WrongRpcGateway extends RpcGateway {
+		CompletableFuture<Boolean> barfoo();
+		void tell(String message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a92235c..f4e0156 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -146,35 +146,6 @@ public class AkkaRpcActorTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that we receive a RpcConnectionException when calling a rpc method (with return type)
-	 * on a wrong rpc endpoint.
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testWrongGatewayEndpointConnection() throws Exception {
-		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
-
-		rpcEndpoint.start();
-
-		CompletableFuture<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
-
-		WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit());
-
-		// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
-		gateway.tell("foobar");
-
-		CompletableFuture<Boolean> result = gateway.barfoo();
-
-		try {
-			result.get(timeout.getSize(), timeout.getUnit());
-			fail("We expected a RpcConnectionException.");
-		} catch (ExecutionException executionException) {
-			assertTrue(executionException.getCause() instanceof RpcConnectionException);
-		}
-	}
-
-	/**
 	 * Tests that we can wait for a RpcEndpoint to terminate.
 	 *
 	 * @throws ExecutionException
@@ -327,15 +298,10 @@ public class AkkaRpcActorTest extends TestLogger {
 	//  Test Actors and Interfaces
 	// ------------------------------------------------------------------------
 
-	private interface DummyRpcGateway extends RpcGateway {
+	interface DummyRpcGateway extends RpcGateway {
 		CompletableFuture<Integer> foobar();
 	}
 
-	private interface WrongRpcGateway extends RpcGateway {
-		CompletableFuture<Boolean> barfoo();
-		void tell(String message);
-	}
-
 	private static class TestRpcEndpoint extends RpcEndpoint {
 
 		protected TestRpcEndpoint(RpcService rpcService) {
@@ -348,7 +314,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
-	private static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
+	static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
 
 		private volatile int _foobar = 42;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89ac3dbe/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index d73ee40..8027c18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -312,4 +312,8 @@ public class AkkaRpcServiceTest extends TestLogger {
 			// expected
 		}
 	}
+
+	@Test
+	public void testVersionIncompatibility() {
+	}
 }


[3/3] flink git commit: [hotfix] Add JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering

Posted by tr...@apache.org.
[hotfix] Add JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering

The JobMasterTest#testSlotRequestTimeoutWhenNoSlotOffering verifies that the JM
will retry a job scheduling if one of its TMs does not properly offer a slot. The
mechanism which triggers this behaviour is the slot request timeout which fails the
ongoing scheduling operation if the slot requests are not fulfilled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17f0e850
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17f0e850
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17f0e850

Branch: refs/heads/master
Commit: 17f0e850fd26da0c50195d3d9daa423ead1fbe3e
Parents: d5de2bc
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 23:57:01 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 14:24:02 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 152 +++++++++++++++++--
 .../resourcemanager/ResourceManagerTest.java    |   4 +-
 .../slotmanager/SlotManagerTest.java            |   3 +-
 .../TestingTaskExecutorGateway.java             |  38 ++---
 .../TestingTaskExecutorGatewayBuilder.java      |  75 +++++++++
 5 files changed, 230 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index c0c9162..d8f33fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -36,12 +39,15 @@ import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -51,14 +57,18 @@ import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.TestLogger;
@@ -78,11 +88,16 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 
 /**
  * Tests for {@link JobMaster}.
@@ -95,9 +110,11 @@ public class JobMasterTest extends TestLogger {
 
 	private static final Time testingTimeout = Time.seconds(10L);
 
-	private static final long heartbeatInterval = 1L;
+	private static final long fastHeartbeatInterval = 1L;
+	private static final long fastHeartbeatTimeout = 5L;
 
-	private static final long heartbeatTimeout = 5L;
+	private static final long heartbeatInterval = 1000L;
+	private static final long heartbeatTimeout = 5000L;
 
 	private static final JobGraph jobGraph = new JobGraph();
 
@@ -105,6 +122,8 @@ public class JobMasterTest extends TestLogger {
 
 	private static HeartbeatServices fastHeartbeatServices;
 
+	private static HeartbeatServices heartbeatServices;
+
 	private BlobServer blobServer;
 
 	private Configuration configuration;
@@ -123,7 +142,8 @@ public class JobMasterTest extends TestLogger {
 	public static void setupClass() {
 		rpcService = new TestingRpcService();
 
-		fastHeartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor());
+		fastHeartbeatServices = new TestingHeartbeatServices(fastHeartbeatInterval, fastHeartbeatTimeout, rpcService.getScheduledExecutor());
+		heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor());
 	}
 
 	@Before
@@ -157,6 +177,8 @@ public class JobMasterTest extends TestLogger {
 		if (blobServer != null) {
 			blobServer.close();
 		}
+
+		rpcService.clearGateways();
 	}
 
 	@AfterClass
@@ -169,14 +191,13 @@ public class JobMasterTest extends TestLogger {
 
 	@Test
 	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
-		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
-
 		final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
 		final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
-
-		taskExecutorGateway.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete);
-		taskExecutorGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete)
+			.setDisconnectJobManagerConsumer((jobId, throwable) -> disconnectedJobManagerFuture.complete(jobId))
+			.createTestingTaskExecutorGateway();
 
 		rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
@@ -224,7 +245,7 @@ public class JobMasterTest extends TestLogger {
 		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(
 			resourceManagerId,
 			rmResourceId,
-			heartbeatInterval,
+			fastHeartbeatInterval,
 			"localhost",
 			"localhost");
 
@@ -306,7 +327,7 @@ public class JobMasterTest extends TestLogger {
 
 			assertThat(savepointCheckpoint, Matchers.notNullValue());
 
-			assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(savepointId));
+			assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
 		}
@@ -356,13 +377,103 @@ public class JobMasterTest extends TestLogger {
 
 			assertThat(savepointCheckpoint, Matchers.notNullValue());
 
-			assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(checkpointId));
+			assertThat(savepointCheckpoint.getCheckpointID(), is(checkpointId));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
 		}
 	}
 
 	/**
+	 * Tests that the JobMaster retries the scheduling of a job
+	 * in case of a missing slot offering from a registered TaskExecutor
+	 */
+	@Test
+	public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
+		final JobGraph restartingJobGraph = createSingleVertexJobWithRestartStrategy();
+
+		final long slotRequestTimeout = 10L;
+		configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout);
+
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			restartingJobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices);
+
+		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+		try {
+			final long start = System.nanoTime();
+			jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+
+			final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			final ArrayBlockingQueue<SlotRequest> blockingQueue = new ArrayBlockingQueue<>(2);
+			resourceManagerGateway.setRequestSlotConsumer(blockingQueue::offer);
+
+			rpcService.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+			rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
+
+			// wait for the first slot request
+			blockingQueue.take();
+
+			final CompletableFuture<TaskDeploymentDescriptor> submittedTaskFuture = new CompletableFuture<>();
+			final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setSubmitTaskConsumer((tdd, ignored) -> {
+					submittedTaskFuture.complete(tdd);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
+				.createTestingTaskExecutorGateway();
+
+			rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+
+			jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();
+
+			// wait for the slot request timeout
+			final SlotRequest slotRequest = blockingQueue.take();
+			final long end = System.nanoTime();
+
+			// we rely on the slot request timeout to fail a stuck scheduling operation
+			assertThat((end-start) / 1_000_000L, Matchers.greaterThanOrEqualTo(slotRequestTimeout));
+
+			assertThat(submittedTaskFuture.isDone(), is(false));
+
+			final SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.UNKNOWN);
+
+			final CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
+
+			final Collection<SlotOffer> acceptedSlots = acceptedSlotsFuture.get();
+
+			assertThat(acceptedSlots, hasSize(1));
+			final SlotOffer acceptedSlot = acceptedSlots.iterator().next();
+
+			assertThat(acceptedSlot.getAllocationId(), equalTo(slotRequest.getAllocationId()));
+
+			// wait for the deployed task
+			final TaskDeploymentDescriptor taskDeploymentDescriptor = submittedTaskFuture.get();
+
+			assertThat(taskDeploymentDescriptor.getAllocationId(), equalTo(slotRequest.getAllocationId()));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
+	private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
+		final JobVertex jobVertex = new JobVertex("Test vertex");
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+		final JobGraph jobGraph = new JobGraph(jobVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+		jobGraph.setExecutionConfig(executionConfig);
+
+		return jobGraph;
+	}
+
+	/**
 	 * Tests that we can close an unestablished ResourceManager connection.
 	 */
 	@Test
@@ -450,6 +561,21 @@ public class JobMasterTest extends TestLogger {
 			JobGraph jobGraph,
 			HighAvailabilityServices highAvailabilityServices,
 			JobManagerSharedServices jobManagerSharedServices) throws Exception {
+		return createJobMaster(
+			jobMasterConfiguration,
+			jobGraph,
+			highAvailabilityServices,
+			jobManagerSharedServices,
+			fastHeartbeatServices);
+	}
+
+	@Nonnull
+	private JobMaster createJobMaster(
+		JobMasterConfiguration jobMasterConfiguration,
+		JobGraph jobGraph,
+		HighAvailabilityServices highAvailabilityServices,
+		JobManagerSharedServices jobManagerSharedServices,
+		HeartbeatServices heartbeatServices) throws Exception {
 		return new JobMaster(
 			rpcService,
 			jobMasterConfiguration,
@@ -457,7 +583,7 @@ public class JobMasterTest extends TestLogger {
 			jobGraph,
 			highAvailabilityServices,
 			jobManagerSharedServices,
-			fastHeartbeatServices,
+			heartbeatServices,
 			blobServer,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			new NoOpOnCompletionActions(),

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 7dab685..ebcb295 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.New;
@@ -105,7 +105,7 @@ public class ResourceManagerTest extends TestLogger {
 		try {
 			final ResourceID taskManagerId = ResourceID.generate();
 			final ResourceManagerGateway resourceManagerGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
 			// first make the ResourceManager the leader
 			resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 59de473..7f6736c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.testutils.category.New;
@@ -1091,7 +1092,7 @@ public class SlotManagerTest extends TestLogger {
 	public void testReportAllocatedSlot() throws Exception {
 		final ResourceID taskManagerId = ResourceID.generate();
 		final ResourceActions resourceActions = mock(ResourceActions.class);
-		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
 
 		try (final SlotManager slotManager = new SlotManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 43b0be2..1c48307 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -40,6 +39,8 @@ import org.apache.flink.util.Preconditions;
 import org.junit.experimental.categories.Category;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
 /**
@@ -52,25 +53,18 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final String hostname;
 
-	private volatile Consumer<ResourceID> heartbeatJobManagerConsumer;
+	private final Consumer<ResourceID> heartbeatJobManagerConsumer;
 
-	private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+	private final BiConsumer<JobID, Throwable> disconnectJobManagerConsumer;
 
-	public TestingTaskExecutorGateway() {
-		this("foobar:1234", "foobar");
-	}
+	private final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer;
 
-	public TestingTaskExecutorGateway(String address, String hostname) {
+	public TestingTaskExecutorGateway(String address, String hostname, Consumer<ResourceID> heartbeatJobManagerConsumer, BiConsumer<JobID, Throwable> disconnectJobManagerConsumer, BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer) {
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
-	}
-
-	public void setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
-		this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
-	}
-
-	public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer) {
-		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+		this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer);
+		this.disconnectJobManagerConsumer = Preconditions.checkNotNull(disconnectJobManagerConsumer);
+		this.submitTaskConsumer = Preconditions.checkNotNull(submitTaskConsumer);
 	}
 
 	@Override
@@ -91,7 +85,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		return submitTaskConsumer.apply(tdd, jobMasterId);
 	}
 
 	@Override
@@ -126,11 +120,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
-		final Consumer<ResourceID> currentHeartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
-
-		if (currentHeartbeatJobManagerConsumer != null) {
-			currentHeartbeatJobManagerConsumer.accept(heartbeatOrigin);
-		}
+		heartbeatJobManagerConsumer.accept(heartbeatOrigin);
 	}
 
 	@Override
@@ -140,11 +130,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public void disconnectJobManager(JobID jobId, Exception cause) {
-		final Consumer<Tuple2<JobID, Throwable>> currentDisconnectJobManagerConsumer = disconnectJobManagerConsumer;
-
-		if (currentDisconnectJobManagerConsumer != null) {
-			currentDisconnectJobManagerConsumer.accept(Tuple2.of(jobId, cause));
-		}
+		disconnectJobManagerConsumer.accept(jobId, cause);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/17f0e850/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
new file mode 100644
index 0000000..2550abf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+/**
+ * Builder for a {@link TestingTaskExecutorGateway}.
+ */
+public class TestingTaskExecutorGatewayBuilder {
+
+	private static final Consumer<ResourceID> NOOP_HEARTBEAT_JOBMANAGER_CONSUMER = ignored -> {};
+	private static final BiConsumer<JobID, Throwable> NOOP_DISCONNECT_JOBMANAGER_CONSUMER = (ignoredA, ignoredB) -> {};
+	private static final BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> NOOP_SUBMIT_TASK_CONSUMER = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get());
+
+	private String address = "foobar:1234";
+	private String hostname = "foobar";
+	private Consumer<ResourceID> heartbeatJobManagerConsumer = NOOP_HEARTBEAT_JOBMANAGER_CONSUMER;
+	private BiConsumer<JobID, Throwable> disconnectJobManagerConsumer = NOOP_DISCONNECT_JOBMANAGER_CONSUMER;
+	private BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer = NOOP_SUBMIT_TASK_CONSUMER;
+
+	public TestingTaskExecutorGatewayBuilder setAddress(String address) {
+		this.address = address;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setHostname(String hostname) {
+		this.hostname = hostname;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
+		this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setDisconnectJobManagerConsumer(BiConsumer<JobID, Throwable> disconnectJobManagerConsumer) {
+		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+		return this;
+	}
+
+	public TestingTaskExecutorGatewayBuilder setSubmitTaskConsumer(BiFunction<TaskDeploymentDescriptor, JobMasterId, CompletableFuture<Acknowledge>> submitTaskConsumer) {
+		this.submitTaskConsumer = submitTaskConsumer;
+		return this;
+	}
+
+	public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
+		return new TestingTaskExecutorGateway(address, hostname, heartbeatJobManagerConsumer, disconnectJobManagerConsumer, submitTaskConsumer);
+	}
+}


[2/3] flink git commit: [hotfix] Resolve compiler warnings in AkkaRpcService

Posted by tr...@apache.org.
[hotfix] Resolve compiler warnings in AkkaRpcService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a093e5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a093e5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a093e5f

Branch: refs/heads/master
Commit: 5a093e5f90a142396f7978ee1d8ec51379cbdde6
Parents: 17f0e85
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue May 15 10:41:15 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 14:24:02 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 26 ++++++++++----------
 .../flink/runtime/rpc/RpcConnectionTest.java    |  2 +-
 2 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a093e5f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 8e96492..1151389 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -44,7 +44,6 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +60,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -68,6 +68,7 @@ import java.util.function.Function;
 
 import scala.Option;
 import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -378,7 +379,7 @@ public class AkkaRpcService implements RpcService {
 
 	@Override
 	public <T> CompletableFuture<T> execute(Callable<T> callable) {
-		Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher());
+		Future<T> scalaFuture = Futures.<T>future(callable, actorSystem.dispatcher());
 
 		return FutureUtils.toJava(scalaFuture);
 	}
@@ -411,15 +412,16 @@ public class AkkaRpcService implements RpcService {
 
 		final ActorSelection actorSel = actorSystem.actorSelection(address);
 
-		final Future<Object> identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds());
-		final Future<C> resultFuture = identify.map(new Mapper<Object, C>(){
-			@Override
-			public C checkedApply(Object obj) throws Exception {
+		final Future<ActorIdentity> identify = Patterns
+			.ask(actorSel, new Identify(42), timeout.toMilliseconds())
+			.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
 
-				ActorIdentity actorIdentity = (ActorIdentity) obj;
+		final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
 
+		return identifyFuture.thenApplyAsync(
+			(ActorIdentity actorIdentity) -> {
 				if (actorIdentity.getRef() == null) {
-					throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.');
+					throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'));
 				} else {
 					ActorRef actorRef = actorIdentity.getRef();
 
@@ -428,7 +430,7 @@ public class AkkaRpcService implements RpcService {
 					// Rather than using the System ClassLoader directly, we derive the ClassLoader
 					// from this class . That works better in cases where Flink runs embedded and all Flink
 					// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
-					ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
+					ClassLoader classLoader = getClass().getClassLoader();
 
 					@SuppressWarnings("unchecked")
 					C proxy = (C) Proxy.newProxyInstance(
@@ -438,9 +440,7 @@ public class AkkaRpcService implements RpcService {
 
 					return proxy;
 				}
-			}
-		}, actorSystem.dispatcher());
-
-		return FutureUtils.toJava(resultFuture);
+			},
+			actorSystem.dispatcher());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5a093e5f/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index 78137e8..2b88a17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -58,7 +58,7 @@ public class RpcConnectionTest extends TestLogger {
 		RpcService rpcService = null;
 		try {
 			actorSystem = AkkaUtils.createActorSystem(
-					new Configuration(), Option.apply(new Tuple2<String, Object>("localhost", 0)));
+					new Configuration(), Option.<Tuple2<String, Object>>apply(new Tuple2<>("localhost", 0)));
 
 			// we start the RPC service with a very long timeout to ensure that the test
 			// can only pass if the connection problem is not recognized merely via a timeout