You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/03/09 08:54:59 UTC
[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/5664
[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource
## What is the purpose of the change
Ports the {{CancelingTestBase}} to use {{MiniClusterResource}}.
## Verifying this change
Run `MapCancelingTestBase` with `flip6` profile enabled/disabled.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 8703_canceling
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5664.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5664
----
commit 57338df4819b2324f7ede2b131f81d83bc9096b2
Author: zentol <ch...@...>
Date: 2018-02-26T14:36:37Z
[FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource
commit 22d4a2f02c256eb41a1684a5766a1dd53dc9351d
Author: zentol <ch...@...>
Date: 2018-02-28T12:43:42Z
[hotfix][tests] Properly disable JoinCancelingITCase
----
---
[GitHub] flink issue #5664: [FLINK-8703][tests] Port CancelingTestBase to MiniCluster...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5664
merging.
---
[GitHub] flink issue #5664: [FLINK-8703][tests] Port CancelingTestBase to MiniCluster...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/5664
Interesting, there is a aborted test run with a core dump.
---
[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5664#discussion_r177794734
--- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java ---
@@ -18,160 +18,111 @@
package org.apache.flink.test.cancelling;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
/**
* Base class for testing job cancellation.
*/
public abstract class CancelingTestBase extends TestLogger {
- private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
-
private static final int MINIMUM_HEAP_SIZE_MB = 192;
- /**
- * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
- * is canceled), starting from the point in time when the cancel request is issued.
- */
- private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
- private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+ protected static final int PARALLELISM = 4;
// --------------------------------------------------------------------------------------------
- protected LocalFlinkMiniCluster executor;
-
- protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
--- End diff --
Why do we start a mini cluster with 2 TMs and 4 slots per TM? Wouldn't a single TM be sufficient?
---
[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5664#discussion_r177801072
--- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ---
@@ -30,19 +30,18 @@
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
+import org.junit.Ignore;
+import org.junit.Test;
+
/**
* Test job cancellation from within a JoinFunction.
*/
+@Ignore("Takes too long.")
--- End diff --
No, I also didn't check how long it actually takes.
---
[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5664
---
[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5664#discussion_r177794320
--- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ---
@@ -30,19 +30,18 @@
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
+import org.junit.Ignore;
+import org.junit.Test;
+
/**
* Test job cancellation from within a JoinFunction.
*/
+@Ignore("Takes too long.")
--- End diff --
Do you know what's taking so long?
---
[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5664#discussion_r177800863
--- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java ---
@@ -18,160 +18,111 @@
package org.apache.flink.test.cancelling;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.ClassRule;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-
/**
* Base class for testing job cancellation.
*/
public abstract class CancelingTestBase extends TestLogger {
- private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
-
private static final int MINIMUM_HEAP_SIZE_MB = 192;
- /**
- * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
- * is canceled), starting from the point in time when the cancel request is issued.
- */
- private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
-
- private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+ protected static final int PARALLELISM = 4;
// --------------------------------------------------------------------------------------------
- protected LocalFlinkMiniCluster executor;
-
- protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
--- End diff --
Possibly, i just copied the existing setup.
---