You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/04 12:22:26 UTC
[08/18] flink git commit: [FLINK-8703][tests] Port CancelingTestBase
to MiniClusterResource
[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource
This closes #5664.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20bda911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20bda911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20bda911
Branch: refs/heads/master
Commit: 20bda911067dc2e2503e4447e498e2ff8731dada
Parents: 916fc20
Author: zentol <ch...@apache.org>
Authored: Mon Feb 26 15:36:37 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed Apr 4 08:59:05 2018 +0200
----------------------------------------------------------------------
.../test/cancelling/CancelingTestBase.java | 133 ++++++-------------
.../test/cancelling/JoinCancelingITCase.java | 9 +-
.../test/cancelling/MapCancelingITCase.java | 7 +-
3 files changed, 45 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 03ca649..cac16f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -18,9 +18,10 @@
package org.apache.flink.test.cancelling;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -28,150 +29,100 @@ import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
/**
* Base class for testing job cancellation.
*/
public abstract class CancelingTestBase extends TestLogger {
- private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
-
private static final int MINIMUM_HEAP_SIZE_MB = 192;
- /**
- * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
- * is canceled), starting from the point in time when the cancel request is issued.
- */
- private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
- private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+ protected static final int PARALLELISM = 4;
// --------------------------------------------------------------------------------------------
- protected LocalFlinkMiniCluster executor;
-
- protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
+ 4),
+ true);
// --------------------------------------------------------------------------------------------
- private void verifyJvmOptions() {
+ private static void verifyJvmOptions() {
final long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
- @Before
- public void startCluster() throws Exception {
+ private static Configuration getConfiguration() {
verifyJvmOptions();
Configuration config = new Configuration();
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
- this.executor = new LocalFlinkMiniCluster(config, false);
- this.executor.start();
- }
-
- @After
- public void stopCluster() throws Exception {
- if (this.executor != null) {
- this.executor.stop();
- this.executor = null;
- FileSystem.closeAll();
- System.gc();
- }
+ return config;
}
// --------------------------------------------------------------------------------------------
- public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
- runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
- }
-
- public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
- try {
- // submit job
- final JobGraph jobGraph = getJobGraph(plan);
-
- executor.submitJobDetached(jobGraph);
+ protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
+ // submit job
+ final JobGraph jobGraph = getJobGraph(plan);
- // Wait for the job to make some progress and then cancel
- JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
- executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
- TestingUtils.TESTING_DURATION());
+ ClusterClient<?> client = CLUSTER.getClusterClient();
+ client.setDetached(true);
- Thread.sleep(msecsTillCanceling);
+ JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
- FiniteDuration timeout = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS);
+ Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
- ActorGateway jobManager = executor.getLeaderGateway(TestingUtils.TESTING_DURATION());
+ JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
+ Thread.sleep(50);
+ jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ }
+ if (jobStatus != JobStatus.RUNNING) {
+ Assert.fail("Job not in state RUNNING.");
+ }
- Future<Object> ask = jobManager.ask(new CancelJob(jobGraph.getJobID()), timeout);
+ Thread.sleep(msecsTillCanceling);
- Object result = Await.result(ask, timeout);
+ client.cancel(jobSubmissionResult.getJobID());
- if (result instanceof CancellationSuccess) {
- // all good
- } else if (result instanceof CancellationFailure) {
- // Failure
- CancellationFailure failure = (CancellationFailure) result;
- throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".",
- failure.cause());
- } else {
- throw new Exception("Unexpected response to cancel request: " + result);
- }
+ Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();
- // Wait for the job to be cancelled
- JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED,
- executor.getLeaderGateway(TestingUtils.TESTING_DURATION()),
- TestingUtils.TESTING_DURATION());
+ JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
+ Thread.sleep(50);
+ jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(cancelDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
- catch (Exception e) {
- LOG.error("Exception found in runAndCancelJob.", e);
- e.printStackTrace();
- Assert.fail(e.getMessage());
+ if (jobStatusAfterCancel != JobStatus.CANCELED) {
+ Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
}
}
- private JobGraph getJobGraph(final Plan plan) throws Exception {
- final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
+ private JobGraph getJobGraph(final Plan plan) {
+ final Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration());
final OptimizedPlan op = pc.compile(plan);
final JobGraphGenerator jgg = new JobGraphGenerator();
return jgg.compileJobGraph(op);
}
-
- public void setTaskManagerNumSlots(int taskManagerNumSlots) {
- this.taskManagerNumSlots = taskManagerNumSlots;
- }
-
- public int getTaskManagerNumSlots() {
- return this.taskManagerNumSlots;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
index 5e21129..66919e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java
@@ -34,15 +34,10 @@ import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
* Test job cancellation from within a JoinFunction.
*/
public class JoinCancelingITCase extends CancelingTestBase {
- private static final int parallelism = 4;
-
- public JoinCancelingITCase() {
- setTaskManagerNumSlots(parallelism);
- }
// --------------- Test Sort Matches that are canceled while still reading / sorting -----------------
private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
- executeTask(joiner, slow, parallelism);
+ executeTask(joiner, slow, PARALLELISM);
}
private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int parallelism) throws Exception {
@@ -90,7 +85,7 @@ public class JoinCancelingITCase extends CancelingTestBase {
.with(joiner)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
- env.setParallelism(parallelism);
+ env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/20bda911/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 3a7039f..13edea4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -31,11 +31,6 @@ import org.junit.Test;
* Test job cancellation from within a MapFunction.
*/
public class MapCancelingITCase extends CancelingTestBase {
- private static final int parallelism = 4;
-
- public MapCancelingITCase() {
- setTaskManagerNumSlots(parallelism);
- }
@Test
public void testMapCancelling() throws Exception {
@@ -65,7 +60,7 @@ public class MapCancelingITCase extends CancelingTestBase {
.map(mapper)
.output(new DiscardingOutputFormat<Integer>());
- env.setParallelism(parallelism);
+ env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
}