You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:26 UTC

[08/18] flink git commit: [FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

This closes #5664.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20bda911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20bda911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20bda911

Branch: refs/heads/master
Commit: 20bda911067dc2e2503e4447e498e2ff8731dada
Parents: 916fc20
Author: zentol <ch...@apache.org>
Authored: Mon Feb 26 15:36:37 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200

----------------------------------------------------------------------
 .../test/cancelling/CancelingTestBase.java      | 133 ++++++-------------
 .../test/cancelling/JoinCancelingITCase.java    |   9 +-
 .../test/cancelling/MapCancelingITCase.java     |   7 +-
 3 files changed, 45 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03ca649..cac16f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.test.cancelling;
 
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -28,150 +29,100 @@ import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
 
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
 /**
  * Base class for testing job cancellation.
  */
 public abstract class CancelingTestBase extends TestLogger {
 
-	private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
-
 	private static final int MINIMUM_HEAP_SIZE_MB = 192;
 
-	/**
-	 * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
-	 * is canceled), starting from the point in time when the cancel request is issued.
-	 */
-	private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
-	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+	protected static final int PARALLELISM = 4;
 
 	// --------------------------------------------------------------------------------------------
 
-	protected LocalFlinkMiniCluster executor;
-
-	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getConfiguration(),
+			2,
+			4),
+		true);
 
 	// --------------------------------------------------------------------------------------------
 
-	private void verifyJvmOptions() {
+	private static void verifyJvmOptions() {
 		final long heap = Runtime.getRuntime().maxMemory() >> 20;
 		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
 				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
 	}
 
-	@Before
-	public void startCluster() throws Exception {
+	private static Configuration getConfiguration() {
 		verifyJvmOptions();
 		Configuration config = new Configuration();
 		config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
 
-		this.executor = new LocalFlinkMiniCluster(config, false);
-		this.executor.start();
-	}
-
-	@After
-	public void stopCluster() throws Exception {
-		if (this.executor != null) {
-			this.executor.stop();
-			this.executor = null;
-			FileSystem.closeAll();
-			System.gc();
-		}
+		return config;
 	}
 
 	// --------------------------------------------------------------------------------------------
 
-	public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
-		runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
-	}
-
-	public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
-		try {
-			// submit job
-			final JobGraph jobGraph = getJobGraph(plan);
-
-			executor.submitJobDetached(jobGraph);
+	protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
+		// submit job
+		final JobGraph jobGraph = getJobGraph(plan);
 
-			// Wait for the job to make some progress and then cancel
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
-					executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-					TestingUtils.TESTING_DURATION());
+		ClusterClient<?> client = CLUSTER.getClusterClient();
+		client.setDetached(true);
 
-			Thread.sleep(msecsTillCanceling);
+		JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
 
-			FiniteDuration timeout = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS);
+		Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 
-			ActorGateway jobManager = executor.getLeaderGateway(TestingUtils.TESTING_DURATION());
+		JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
+			Thread.sleep(50);
+			jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		}
+		if (jobStatus != JobStatus.RUNNING) {
+			Assert.fail("Job not in state RUNNING.");
+		}
 
-			Future<Object> ask = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout);
+		Thread.sleep(msecsTillCanceling);
 
-			Object result = Await.result(ask, timeout);
+		client.cancel(jobSubmissionResult.getJobID());
 
-			if (result instanceof CancellationSuccess) {
-				// all good
-			} else if (result instanceof CancellationFailure) {
-				// Failure
-				CancellationFailure failure = (CancellationFailure) result;
-				throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".",
-						failure.cause());
-			} else {
-				throw new Exception("Unexpected response to cancel request: " + result);
-			}
+		Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
 
-			// Wait for the job to be cancelled
-			JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED,
-					executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-					TestingUtils.TESTING_DURATION());
+		JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
+			Thread.sleep(50);
+			jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 		}
-		catch (Exception e) {
-			LOG.error("Exception found in runAndCancelJob.", e);
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
+		if (jobStatusAfterCancel != JobStatus.CANCELED) {
+			Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
 		}
 	}
 
-	private JobGraph getJobGraph(final Plan plan) throws Exception {
-		final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
+	private JobGraph getJobGraph(final Plan plan) {
+		final Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration());
 		final OptimizedPlan op = pc.compile(plan);
 		final JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
-
-	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
-		this.taskManagerNumSlots = taskManagerNumSlots;
-	}
-
-	public int getTaskManagerNumSlots() {
-		return this.taskManagerNumSlots;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 5e21129..66919e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -34,15 +34,10 @@ import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
  * Test job cancellation from within a JoinFunction.
  */
 public class JoinCancelingITCase extends CancelingTestBase {
-	private static final int parallelism = 4;
-
-	public JoinCancelingITCase() {
-		setTaskManagerNumSlots(parallelism);
-	}
 
 	// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
 	private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
-		executeTask(joiner, slow, parallelism);
+		executeTask(joiner, slow, PARALLELISM);
 	}
 
 	private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int parallelism) throws Exception {
@@ -90,7 +85,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
 				.with(joiner)
 				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 
-		env.setParallelism(parallelism);
+		env.setParallelism(PARALLELISM);
 
 		runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 3a7039f..13edea4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -31,11 +31,6 @@ import org.junit.Test;
  * Test job cancellation from within a MapFunction.
  */
 public class MapCancelingITCase extends CancelingTestBase {
-	private static final int parallelism = 4;
-
-	public MapCancelingITCase() {
-		setTaskManagerNumSlots(parallelism);
-	}
 
 	@Test
 	public void testMapCancelling() throws Exception {
@@ -65,7 +60,7 @@ public class MapCancelingITCase extends CancelingTestBase {
 				.map(mapper)
 				.output(new DiscardingOutputFormat<Integer>());
 
-		env.setParallelism(parallelism);
+		env.setParallelism(PARALLELISM);
 
 		runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
 	}