You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:49 UTC

[30/52] [abbrv] flink git commit: [FLINK-4954] [rpc] Discard messages when AkkaRpcActor is in state Processing.STOP

[FLINK-4954] [rpc] Discard messages when AkkaRpcActor is in state Processing.STOP

When the AkkaRpcActor receives a message while being in state Processing.STOP it will discard
it and send an AkkaRpcException back to the caller. This replaces the old stashing behaviour
which had the problem that it was just a best effort approach to keep all received messages.
Distributed components should not rely on this behaviour. That's why it was replaced with discarding
messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/006a19d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/006a19d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/006a19d4

Branch: refs/heads/master
Commit: 006a19d4e601a0917e073215d866b5b87dce375f
Parents: 522edae
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 28 14:07:28 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:25 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 23 ++++++-----
 .../rpc/akka/exceptions/AkkaRpcException.java   | 41 ++++++++++++++++++++
 .../rpc/exceptions/RpcConnectionException.java  |  4 +-
 .../runtime/rpc/exceptions/RpcException.java    | 39 +++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 21 ++++++++--
 5 files changed, 112 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index c21383a..fe6b23b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.actor.UntypedActorWithStash;
+import akka.actor.UntypedActor;
 import akka.dispatch.Futures;
 import akka.japi.Procedure;
 import akka.pattern.Patterns;
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
 import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.Processing;
@@ -60,14 +61,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * in the context of the actor thread.
  * <p>
  * The {@link Processing} message controls the processing behaviour of the akka rpc actor. A
- * {@link Processing#START} message unstashes all stashed messages and starts processing incoming
- * messages. A {@link Processing#STOP} message stops processing messages and stashes incoming
- * messages.
+ * {@link Processing#START} starts processing incoming messages. A {@link Processing#STOP} message
+ * stops processing messages. All messages which arrive when the processing is stopped, will be
+ * discarded.
  *
  * @param <C> Type of the {@link RpcGateway} associated with the {@link RpcEndpoint}
  * @param <T> Type of the {@link RpcEndpoint}
  */
-class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActorWithStash {
+class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
 
@@ -86,7 +87,7 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 	}
 
 	@Override
-	public void postStop() {
+	public void postStop() throws Exception {
 		super.postStop();
 
 		// IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise
@@ -99,7 +100,6 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 	@Override
 	public void onReceive(final Object message) {
 		if (message.equals(Processing.START)) {
-			unstashAll();
 			getContext().become(new Procedure<Object>() {
 				@Override
 				public void apply(Object msg) throws Exception {
@@ -111,10 +111,15 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 				}
 			});
 		} else {
-			LOG.info("The rpc endpoint {} has not been started yet. Stashing message {} until processing is started.",
+			LOG.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
 				rpcEndpoint.getClass().getName(),
 				message.getClass().getName());
-			stash();
+
+			if (!getSender().equals(ActorRef.noSender())) {
+				// fail a possible future if we have a sender
+				getSender().tell(new Status.Failure(new AkkaRpcException("Discard message, because " +
+					"the rpc endpoint has not been started yet.")), getSelf());
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java
new file mode 100644
index 0000000..f0d6548
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaRpcException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.exceptions;
+
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+
+/**
+ * Base class for Akka RPC related exceptions.
+ */
+public class AkkaRpcException extends RpcException {
+
+	private static final long serialVersionUID = -3796329968494146418L;
+
+	public AkkaRpcException(String message) {
+		super(message);
+	}
+
+	public AkkaRpcException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public AkkaRpcException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
index a22ebe7..4eaf34f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rpc.exceptions;
 
-import java.util.concurrent.ExecutionException;
-
 /**
  * Exception class which is thrown if a rpc connection failed. Usually this happens if the remote
  * host cannot be reached.
  */
-public class RpcConnectionException extends ExecutionException {
+public class RpcConnectionException extends RpcException {
 	private static final long serialVersionUID = -5500560405481142472L;
 
 	public RpcConnectionException(String message) {

http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java
new file mode 100644
index 0000000..652b3f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+/**
+ * Base class for RPC related exceptions.
+ */
+public class RpcException extends Exception {
+
+	private static final long serialVersionUID = -7163591879289483630L;
+
+	public RpcException(String message) {
+		super(message);
+	}
+
+	public RpcException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public RpcException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/006a19d4/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index d2dbab7..760e1a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.core.Is;
@@ -95,26 +96,38 @@ public class AkkaRpcActorTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding
+	 * Tests that the {@link AkkaRpcActor} discards messages until the corresponding
 	 * {@link RpcEndpoint} has been started.
 	 */
 	@Test
-	public void testMessageStashing() throws Exception {
+	public void testMessageDiscarding() throws Exception {
 		int expectedValue = 1337;
 
 		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 
 		DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
 
-		// this message should not be processed until we've started the rpc endpoint
+		// this message should be discarded and completed with an AkkaRpcException
 		Future<Integer> result = rpcGateway.foobar();
 
+		try {
+			result.get(timeout.getSize(), timeout.getUnit());
+			fail("Expected an AkkaRpcException.");
+		} catch (ExecutionException ee) {
+			// expected this exception, because the endpoint has not been started
+			assertTrue(ee.getCause() instanceof AkkaRpcException);
+		}
+
 		// set a new value which we expect to be returned
 		rpcEndpoint.setFoobar(expectedValue);
 
-		// now process the rpc
+		// start the endpoint so that it can process messages
 		rpcEndpoint.start();
 
+		// send the rpc again
+		result = rpcGateway.foobar();
+
+		// now we should receive a result :-)
 		Integer actualValue = result.get(timeout.getSize(), timeout.getUnit());
 
 		assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue));