You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:09 UTC

[04/50] [abbrv] flink git commit: [FLINK-4434] [rpc] Add a testing RPC service.

[FLINK-4434] [rpc] Add a testing RPC service.

This closes #2394.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0fa27fcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0fa27fcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0fa27fcb

Branch: refs/heads/flip-6
Commit: 0fa27fcb82a70846958292798a12e133b3fb19cd
Parents: 3112112
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 19 23:29:45 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:41 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/RpcCompletenessTest.java  |   3 +
 .../flink/runtime/rpc/TestingGatewayBase.java   |  85 ++++++++++++++
 .../flink/runtime/rpc/TestingRpcService.java    | 115 +++++++++++++++++++
 3 files changed, 203 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0fa27fcb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 97cf0cb..b8aad62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -41,9 +41,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class RpcCompletenessTest extends TestLogger {
+
 	private static final Class<?> futureClass = Future.class;
 
 	@Test
+	@SuppressWarnings({"rawtypes", "unchecked"})
 	public void testRpcCompleteness() {
 		Reflections reflections = new Reflections("org.apache.flink");
 
@@ -64,6 +66,7 @@ public class RpcCompletenessTest extends TestLogger {
 		}
 	}
 
+	@SuppressWarnings("rawtypes")
 	private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
 		Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
 		Method[] serverMethods = rpcEndpoint.getDeclaredMethods();

http://git-wip-us.apache.org/repos/asf/flink/blob/0fa27fcb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
new file mode 100644
index 0000000..4256135
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.Futures;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utility base class for testing gateways
+ */
+public abstract class TestingGatewayBase implements RpcGateway {
+
+	private final ScheduledExecutorService executor;
+
+	protected TestingGatewayBase() {
+		this.executor = Executors.newSingleThreadScheduledExecutor();
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	public void stop() {
+		executor.shutdownNow();
+	}
+
+	@Override
+	protected void finalize() throws Throwable {
+		super.finalize();
+		executor.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	public <T> Future<T> futureWithTimeout(long timeoutMillis) {
+		Promise<T> promise = Futures.<T>promise();
+		executor.schedule(new FutureTimeout(promise), timeoutMillis, TimeUnit.MILLISECONDS);
+		return promise.future();
+	}
+
+	// ------------------------------------------------------------------------
+	
+	private static final class FutureTimeout implements Runnable {
+
+		private final Promise<?> promise;
+
+		private FutureTimeout(Promise<?> promise) {
+			this.promise = promise;
+		}
+
+		@Override
+		public void run() {
+			try {
+				promise.failure(new TimeoutException());
+			} catch (Throwable t) {
+				System.err.println("CAUGHT AN ERROR IN THE TEST: " + t.getMessage());
+				t.printStackTrace();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0fa27fcb/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
new file mode 100644
index 0000000..7e92e8d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An RPC Service implementation for testing. This RPC service acts as a replacement for
+ * teh regular RPC service for cases where tests need to return prepared mock gateways instead of
+ * proper RPC gateways.
+ * 
+ * <p>The TestingRpcService can be used for example in the following fashion,
+ * using <i>Mockito</i> for mocks and verification:
+ * 
+ * <pre>{@code
+ * TestingRpcService rpc = new TestingRpcService();
+ *
+ * ResourceManagerGateway testGateway = mock(ResourceManagerGateway.class);
+ * rpc.registerGateway("myAddress", testGateway);
+ * 
+ * MyComponentToTest component = new MyComponentToTest();
+ * component.triggerSomethingThatCallsTheGateway();
+ * 
+ * verify(testGateway, timeout(1000)).theTestMethod(any(UUID.class), anyString());
+ * }</pre>
+ */
+public class TestingRpcService extends AkkaRpcService {
+
+	/** Map of pre-registered connections */
+	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+
+	/**
+	 * Creates a new {@code TestingRpcService}. 
+	 */
+	public TestingRpcService() {
+		this(new Configuration());
+	}
+
+	/**
+	 * Creates a new {@code TestingRpcService}, using the given configuration. 
+	 */
+	public TestingRpcService(Configuration configuration) {
+		super(AkkaUtils.createLocalActorSystem(configuration), new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+
+		this.registeredConnections = new ConcurrentHashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void stopService() {
+		super.stopService();
+		registeredConnections.clear();
+	}
+
+	// ------------------------------------------------------------------------
+	// connections
+	// ------------------------------------------------------------------------
+
+	public void registerGateway(String address, RpcGateway gateway) {
+		checkNotNull(address);
+		checkNotNull(gateway);
+
+		if (registeredConnections.putIfAbsent(address, gateway) != null) {
+			throw new IllegalStateException("a gateway is already registered under " + address);
+		}
+	}
+
+	@Override
+	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+		RpcGateway gateway = registeredConnections.get(address);
+
+		if (gateway != null) {
+			if (clazz.isAssignableFrom(gateway.getClass())) {
+				@SuppressWarnings("unchecked")
+				C typedGateway = (C) gateway;
+				return Futures.successful(typedGateway);
+			} else {
+				return Futures.failed(
+						new Exception("Gateway registered under " + address + " is not of type " + clazz));
+			}
+		} else {
+			return Futures.failed(new Exception("No gateway registered under that name"));
+		}
+	}
+}
\ No newline at end of file