You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/17 20:48:53 UTC

[flink] branch master updated: [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f012270  [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase
f012270 is described below

commit f012270b1b9fad32e543e7a0195e793ebb402fc5
Author: tison <wa...@gmail.com>
AuthorDate: Thu Jan 17 06:05:20 2019 +0800

    [FLINK-11360] [test] Check and remove LocalFlinkMiniClusterITCase
    
    This closes #7516.
---
 .../runtime/minicluster/MiniClusterITCase.java     |  18 +-
 .../minicluster/LocalFlinkMiniClusterITCase.java   | 186 ---------------------
 2 files changed, 10 insertions(+), 194 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 585e1ba..e0b5361 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -66,11 +66,12 @@ public class MiniClusterITCase extends TestLogger {
 
 	@Test
 	public void runJobWithSingleRpcService() throws Exception {
-		final int parallelism = 23;
+		final int numOfTMs = 3;
+		final int slotsPerTM = 7;
 
 		final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setNumTaskManagers(1)
-			.setNumSlotsPerTaskManager(parallelism)
+			.setNumTaskManagers(numOfTMs)
+			.setNumSlotsPerTaskManager(slotsPerTM)
 			.setRpcServiceSharing(RpcServiceSharing.SHARED)
 			.setConfiguration(getDefaultConfiguration())
 			.build();
@@ -78,17 +79,18 @@ public class MiniClusterITCase extends TestLogger {
 		try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
 			miniCluster.start();
 
-			miniCluster.executeJobBlocking(getSimpleJob(parallelism));
+			miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM));
 		}
 	}
 
 	@Test
 	public void runJobWithMultipleRpcServices() throws Exception {
-		final int parallelism = 23;
+		final int numOfTMs = 3;
+		final int slotsPerTM = 7;
 
 		final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setNumTaskManagers(1)
-			.setNumSlotsPerTaskManager(parallelism)
+			.setNumTaskManagers(numOfTMs)
+			.setNumSlotsPerTaskManager(slotsPerTM)
 			.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
 			.setConfiguration(getDefaultConfiguration())
 			.build();
@@ -96,7 +98,7 @@ public class MiniClusterITCase extends TestLogger {
 		try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
 			miniCluster.start();
 
-			miniCluster.executeJobBlocking(getSimpleJob(parallelism));
+			miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM));
 		}
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
deleted file mode 100644
index 47bb21c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.runtime.minicluster;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import akka.actor.RobustActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeoutException;
-
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.impl.ExecutionContextImpl;
-
-import static org.junit.Assert.fail;
-
-/**
- * Integration tests for {@link LocalFlinkMiniCluster}.
- */
-public class LocalFlinkMiniClusterITCase extends TestLogger {
-
-	private static final String[] ALLOWED_THREAD_PREFIXES = {
-		// This is a daemon thread spawned by netty's ThreadLocalRandom class if no
-		// initialSeedUniquifier is set yet and it is sometimes spawned before this test and
-		// sometimes during this test.
-		"initialSeedUniquifierGenerator",
-		// This thread quits only on JVM because of static field
-		// io.netty.buffer.PooledByteBufAllocator.DEFAULT
-		// https://github.com/netty/netty/issues/7759
-		"ObjectCleanerThread"
-	};
-
-	@Test
-	public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws InterruptedException, TimeoutException {
-
-		final ActorSystem system = RobustActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
-		LocalFlinkMiniCluster miniCluster = null;
-
-		final int numTMs = 3;
-		final int numSlots = 14;
-
-		// gather the threads that already exist
-		final Set<Thread> threadsBefore = new HashSet<>();
-		{
-			final Thread[] allThreads = new Thread[Thread.activeCount()];
-			Thread.enumerate(allThreads);
-			threadsBefore.addAll(Arrays.asList(allThreads));
-		}
-
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
-			miniCluster = new LocalFlinkMiniCluster(config, true);
-
-			miniCluster.start();
-
-			final ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
-
-			new JavaTestKit(system) {{
-				final ActorGateway selfGateway = new AkkaActorGateway(getRef(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-				new Within(TestingUtils.TESTING_DURATION()) {
-
-					@Override
-					protected void run() {
-						jmGateway.tell(
-								JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-								selfGateway);
-
-						expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs);
-
-						jmGateway.tell(
-								JobManagerMessages.getRequestTotalNumberOfSlots(),
-								selfGateway);
-
-						expectMsgEquals(TestingUtils.TESTING_DURATION(), numTMs * numSlots);
-					}
-				};
-			}};
-
-		} finally {
-			if (miniCluster != null) {
-				miniCluster.stop();
-				miniCluster.awaitTermination();
-			}
-
-			JavaTestKit.shutdownActorSystem(system);
-			Await.ready(system.whenTerminated(), Duration.Inf());
-		}
-
-		// shut down the global execution context, to make sure it does not affect this testing
-		try {
-			Field f = ExecutionContextImpl.class.getDeclaredField("executor");
-			f.setAccessible(true);
-
-			Object exec = ExecutionContext$.MODULE$.global();
-			ForkJoinPool executor = (ForkJoinPool) f.get(exec);
-			executor.shutdownNow();
-		}
-		catch (Exception e) {
-			System.err.println("Cannot test proper thread shutdown for local execution.");
-			return;
-		}
-
-		// check for remaining threads
-		// we need to check repeatedly for a while, because some threads shut down slowly
-
-		long deadline = System.currentTimeMillis() + 30000;
-		boolean foundThreads = true;
-		String threadName = "";
-
-		while (System.currentTimeMillis() < deadline) {
-			// check that no additional threads remain
-			final Thread[] threadsAfter = new Thread[Thread.activeCount()];
-			Thread.enumerate(threadsAfter);
-
-			foundThreads = false;
-			for (Thread t : threadsAfter) {
-				if (t.isAlive() && !threadsBefore.contains(t)) {
-					// this thread was not there before. check if it is allowed
-					boolean allowed = false;
-					for (String prefix : ALLOWED_THREAD_PREFIXES) {
-						if (t.getName().startsWith(prefix)) {
-							allowed = true;
-							break;
-						}
-					}
-
-					if (!allowed) {
-						foundThreads = true;
-						threadName = t.toString();
-						break;
-					}
-				}
-			}
-
-			if (foundThreads) {
-				try {
-					Thread.sleep(500);
-				} catch (InterruptedException ignored) {}
-			} else {
-				break;
-			}
-		}
-
-		if (foundThreads) {
-			fail("Thread " + threadName + " was started by the mini cluster, but not shut down");
-		}
-	}
-}