You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 07:57:02 UTC

[flink] branch master updated (20fff57 -> bb64862)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 20fff57  [FLINK-11048] Mark new RemoteStreamEnvironment constructor PublicEvolving
     new a8c1f7c  [hotfix][tests] Fix checkstyle violations in AkkaRpcServiceTest
     new bb64862  [FLINK-11356][tests] Port JobManagerStartupTest to new code base

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/blob/BlobServerTest.java  |  76 +++++++++++
 .../clusterframework/BootstrapToolsTest.java       |  26 ++++
 .../runtime/jobmanager/JobManagerStartupTest.java  | 148 ---------------------
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 109 ++++++---------
 4 files changed, 142 insertions(+), 217 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java


[flink] 02/02: [FLINK-11356][tests] Port JobManagerStartupTest to new code base

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb64862b8b7e3ee35625bfe8cb003e8eada8e949
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 15:48:56 2019 +0100

    [FLINK-11356][tests] Port JobManagerStartupTest to new code base
    
    - Moved JobManagerStartupTest#testStartupWithPortInUse to
    BootstrapToolsTest#testActorSystemInstantiationFailureWhenPortOccupied
    
    - Moved JobManagerStartupTest#testJobManagerStartupFails to
    BlobServerTest#testFailureIfStorageDirectoryCannotBeCreated
    
    This closes #7541.
---
 .../apache/flink/runtime/blob/BlobServerTest.java  |  76 +++++++++++
 .../clusterframework/BootstrapToolsTest.java       |  26 ++++
 .../runtime/jobmanager/JobManagerStartupTest.java  | 148 ---------------------
 3 files changed, 102 insertions(+), 148 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java
new file mode 100644
index 0000000..cc335ca
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
+
+/**
+ * Tests for the {@link BlobServer}.
+ */
+public class BlobServerTest extends TestLogger {
+
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	/**
+	 * Tests that the {@link BlobServer} fails if the blob storage directory
+	 * cannot be created.
+	 */
+	@Test
+	public void testFailureIfStorageDirectoryCannotBeCreated() throws IOException {
+		final Configuration configuration = new Configuration();
+		final File blobStorageDirectory = createNonWritableDirectory();
+
+		final String nonExistDirectory = new File(blobStorageDirectory, "does_not_exist_for_sure").getAbsolutePath();
+		configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, nonExistDirectory);
+
+		try (BlobServer ignored = new BlobServer(configuration, new VoidBlobStore())) {
+			fail("Expected that the BlobServer initialization fails.");
+		} catch (IOException expected) {
+			// expected
+		}
+	}
+
+	@Nonnull
+	private File createNonWritableDirectory() throws IOException {
+		assumeFalse(OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
+		final File blobStorageDirectory = temporaryFolder.newFolder();
+		assertTrue(blobStorageDirectory.setExecutable(true, false));
+		assertTrue(blobStorageDirectory.setReadable(true, false));
+		assertTrue(blobStorageDirectory.setWritable(false, false));
+		return blobStorageDirectory;
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 805e970..54aadf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.CheckedSupplier;
@@ -32,6 +33,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -42,9 +46,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link BootstrapToolsTest}.
@@ -360,4 +367,23 @@ public class BootstrapToolsTest extends TestLogger {
 			ExecutorUtils.gracefulShutdown(10000L, TimeUnit.MILLISECONDS, executorService);
 		}
 	}
+
+	/**
+	 * Tests that the {@link ActorSystem} fails with an expressive exception if it cannot be
+	 * instantiated due to an occupied port.
+	 */
+	@Test
+	public void testActorSystemInstantiationFailureWhenPortOccupied() throws Exception {
+		final ServerSocket portOccupier = new ServerSocket(0, 10, InetAddress.getByName("0.0.0.0"));
+
+		try {
+			final int port = portOccupier.getLocalPort();
+			BootstrapTools.startActorSystem(new Configuration(), "0.0.0.0", port, LOG);
+			fail("Expected to fail with a BindException");
+		} catch (Exception e) {
+			assertThat(ExceptionUtils.findThrowable(e, BindException.class).isPresent(), is(true));
+		} finally {
+			portOccupier.close();
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
deleted file mode 100644
index e535415..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import static org.junit.Assert.*;
-import static org.junit.Assume.assumeTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.StartupUtils;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests that verify the startup behavior of the JobManager in failure
- * situations, when the JobManager cannot be started.
- */
-public class JobManagerStartupTest extends TestLogger {
-
-	private final static String DOES_NOT_EXISTS_NO_SIR = "does-not-exist-no-sir";
-
-	private File blobStorageDirectory;
-
-	@Rule
-	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws IOException {
-		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
-
-		// Prepare test directory
-		blobStorageDirectory = temporaryFolder.newFolder();
-
-		assertTrue(blobStorageDirectory.setExecutable(true, false));
-		assertTrue(blobStorageDirectory.setReadable(true, false));
-		assertTrue(blobStorageDirectory.setWritable(false, false));
-	}
-
-	@After
-	public void after() {
-		// Cleanup test directory
-		if (blobStorageDirectory != null) {
-			assertTrue(blobStorageDirectory.delete());
-		}
-	}
-
-	/**
-	 * Verifies that the JobManager fails fast (and with expressive error message)
-	 * when the port to listen is already in use.
-	 * @throws Throwable 
-	 */
-	@Test( expected = BindException.class )
-	public void testStartupWithPortInUse() throws BindException {
-		
-		ServerSocket portOccupier;
-		final int portNum;
-		
-		try {
-			portNum = NetUtils.getAvailablePort();
-			portOccupier = new ServerSocket(portNum, 10, InetAddress.getByName("0.0.0.0"));
-		}
-		catch (Throwable t) {
-			// could not find free port, or open a connection there
-			return;
-		}
-		
-		try {
-			JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER, "localhost", portNum);
-			fail("this should throw an exception");
-		}
-		catch (Exception e) {
-			// expected
-			List<Throwable> causes = StartupUtils.getExceptionCauses(e, new ArrayList<Throwable>());
-			for(Throwable cause:causes) {
-				if(cause instanceof BindException) {
-					throw (BindException) cause;
-				}	
-			}
-			throw e;
-		}
-		finally {
-			try {
-				portOccupier.close();
-			}
-			catch (Throwable t) {
-				// ignore
-			}
-		}
-	}
-	
-	/**
-	 * Verifies that the JobManager fails fast (and with expressive error message)
-	 * when one of its components (here the BLOB server) fails to start properly.
-	 */
-	@Test
-	public void testJobManagerStartupFails() {
-		final int portNum;
-		try {
-			portNum = NetUtils.getAvailablePort();
-		}
-		catch (Throwable t) {
-			// skip test if we cannot find a free port
-			return;
-		}
-		Configuration failConfig = new Configuration();
-		String nonExistDirectory = new File(blobStorageDirectory, DOES_NOT_EXISTS_NO_SIR).getAbsolutePath();
-		failConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, nonExistDirectory);
-
-		try {
-			JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);
-			fail("this should fail with an exception");
-		}
-		catch (Exception e) {
-			// expected
-		}
-	}
-}


[flink] 01/02: [hotfix][tests] Fix checkstyle violations in AkkaRpcServiceTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8c1f7c4c05330e25b68ddc520b7cf2bd8e90576
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 15:27:56 2019 +0100

    [hotfix][tests] Fix checkstyle violations in AkkaRpcServiceTest
---
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 109 ++++++++-------------
 1 file changed, 40 insertions(+), 69 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 7b15d30..caf22f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -31,7 +31,6 @@ import org.junit.AfterClass;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -44,27 +43,29 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link AkkaRpcService}.
+ */
 public class AkkaRpcServiceTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 	//  shared test members
 	// ------------------------------------------------------------------------
 
-	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+	private static final ActorSystem ACTOR_SYSTEM = AkkaUtils.createDefaultActorSystem();
 
-	private static final Time timeout = Time.milliseconds(10000);
+	private static final Time TIMEOUT = Time.milliseconds(10000L);
 
-	private static AkkaRpcService akkaRpcService =
-			new AkkaRpcService(actorSystem, timeout);
+	private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, TIMEOUT);
 
 	@AfterClass
 	public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
-		final CompletableFuture<Void> rpcTerminationFuture = akkaRpcService.stopService();
-		final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+		final CompletableFuture<Void> rpcTerminationFuture = AKKA_RPC_SERVICE.stopService();
+		final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(ACTOR_SYSTEM.terminate());
 
 		FutureUtils
 			.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
-			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
 
 	// ------------------------------------------------------------------------
@@ -74,15 +75,10 @@ public class AkkaRpcServiceTest extends TestLogger {
 	@Test
 	public void testScheduleRunnable() throws Exception {
 		final OneShotLatch latch = new OneShotLatch();
-		final long delay = 100;
+		final long delay = 100L;
 		final long start = System.nanoTime();
 
-		ScheduledFuture<?> scheduledFuture = akkaRpcService.scheduleRunnable(new Runnable() {
-			@Override
-			public void run() {
-				latch.trigger();
-			}
-		}, delay, TimeUnit.MILLISECONDS);
+		ScheduledFuture<?> scheduledFuture = AKKA_RPC_SERVICE.scheduleRunnable(latch::trigger, delay, TimeUnit.MILLISECONDS);
 
 		scheduledFuture.get();
 
@@ -93,13 +89,13 @@ public class AkkaRpcServiceTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the {@link AkkaRpcService} can execute runnables
+	 * Tests that the {@link AkkaRpcService} can execute runnables.
 	 */
 	@Test
 	public void testExecuteRunnable() throws Exception {
 		final OneShotLatch latch = new OneShotLatch();
 
-		akkaRpcService.execute(() -> latch.trigger());
+		AKKA_RPC_SERVICE.execute(latch::trigger);
 
 		latch.await(30L, TimeUnit.SECONDS);
 	}
@@ -109,16 +105,13 @@ public class AkkaRpcServiceTest extends TestLogger {
 	 * a {@link CompletableFuture}.
 	 */
 	@Test
-	public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
+	public void testExecuteCallable() throws Exception {
 		final OneShotLatch latch = new OneShotLatch();
 		final int expected = 42;
 
-		CompletableFuture<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
-			@Override
-			public Integer call() throws Exception {
-				latch.trigger();
-				return expected;
-			}
+		CompletableFuture<Integer> result = AKKA_RPC_SERVICE.execute(() -> {
+			latch.trigger();
+			return expected;
 		});
 
 		int actual = result.get(30L, TimeUnit.SECONDS);
@@ -129,16 +122,16 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 	@Test
 	public void testGetAddress() {
-		assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
+		assertEquals(AkkaUtils.getAddress(ACTOR_SYSTEM).host().get(), AKKA_RPC_SERVICE.getAddress());
 	}
 
 	@Test
 	public void testGetPort() {
-		assertEquals(AkkaUtils.getAddress(actorSystem).port().get(), akkaRpcService.getPort());
+		assertEquals(AkkaUtils.getAddress(ACTOR_SYSTEM).port().get(), AKKA_RPC_SERVICE.getPort());
 	}
 
 	/**
-	 * Tests that we can wait for the termination of the rpc service
+	 * Tests that we can wait for the termination of the rpc service.
 	 */
 	@Test(timeout = 60000)
 	public void testTerminationFuture() throws Exception {
@@ -159,18 +152,13 @@ public class AkkaRpcServiceTest extends TestLogger {
 	 * service.
 	 */
 	@Test(timeout = 60000)
-	public void testScheduledExecutorServiceSimpleSchedule() throws ExecutionException, InterruptedException {
-		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+	public void testScheduledExecutorServiceSimpleSchedule() throws Exception {
+		ScheduledExecutor scheduledExecutor = AKKA_RPC_SERVICE.getScheduledExecutor();
 
 		final OneShotLatch latch = new OneShotLatch();
 
 		ScheduledFuture<?> future = scheduledExecutor.schedule(
-			new Runnable() {
-				@Override
-				public void run() {
-					latch.trigger();
-				}
-			},
+			latch::trigger,
 			10L,
 			TimeUnit.MILLISECONDS);
 
@@ -185,8 +173,8 @@ public class AkkaRpcServiceTest extends TestLogger {
 	 * rate.
 	 */
 	@Test(timeout = 60000)
-	public void testScheduledExecutorServicePeriodicSchedule() throws ExecutionException, InterruptedException {
-		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+	public void testScheduledExecutorServicePeriodicSchedule() throws Exception {
+		ScheduledExecutor scheduledExecutor = AKKA_RPC_SERVICE.getScheduledExecutor();
 
 		final int tries = 4;
 		final long delay = 10L;
@@ -195,12 +183,7 @@ public class AkkaRpcServiceTest extends TestLogger {
 		long currentTime = System.nanoTime();
 
 		ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate(
-			new Runnable() {
-				@Override
-				public void run() {
-					countDownLatch.countDown();
-				}
-			},
+			countDownLatch::countDown,
 			delay,
 			delay,
 			TimeUnit.MILLISECONDS);
@@ -225,8 +208,8 @@ public class AkkaRpcServiceTest extends TestLogger {
 	 * delay.
 	 */
 	@Test(timeout = 60000)
-	public void testScheduledExecutorServiceWithFixedDelaySchedule() throws ExecutionException, InterruptedException {
-		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+	public void testScheduledExecutorServiceWithFixedDelaySchedule() throws Exception {
+		ScheduledExecutor scheduledExecutor = AKKA_RPC_SERVICE.getScheduledExecutor();
 
 		final int tries = 4;
 		final long delay = 10L;
@@ -235,12 +218,7 @@ public class AkkaRpcServiceTest extends TestLogger {
 		long currentTime = System.nanoTime();
 
 		ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
-			new Runnable() {
-				@Override
-				public void run() {
-					countDownLatch.countDown();
-				}
-			},
+			countDownLatch::countDown,
 			delay,
 			delay,
 			TimeUnit.MILLISECONDS);
@@ -265,7 +243,7 @@ public class AkkaRpcServiceTest extends TestLogger {
 	 */
 	@Test
 	public void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException {
-		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+		ScheduledExecutor scheduledExecutor = AKKA_RPC_SERVICE.getScheduledExecutor();
 
 		long delay = 10L;
 
@@ -274,20 +252,17 @@ public class AkkaRpcServiceTest extends TestLogger {
 		final OneShotLatch shouldNotBeTriggeredLatch = new OneShotLatch();
 
 		ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
-			new Runnable() {
-				@Override
-				public void run() {
-					try {
-						if (!futureTask.isTriggered()) {
-							// first run
-							futureTask.trigger();
-							latch.await();
-						} else {
-							shouldNotBeTriggeredLatch.trigger();
-						}
-					} catch (InterruptedException e) {
-						// ignore
+			() -> {
+				try {
+					if (futureTask.isTriggered()) {
+						shouldNotBeTriggeredLatch.trigger();
+					} else {
+						// first run
+						futureTask.trigger();
+						latch.await();
 					}
+				} catch (InterruptedException ignored) {
+					// ignore
 				}
 			},
 			delay,
@@ -309,8 +284,4 @@ public class AkkaRpcServiceTest extends TestLogger {
 			// expected
 		}
 	}
-
-	@Test
-	public void testVersionIncompatibility() {
-	}
 }