You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/03/09 08:54:59 UTC

[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5664

    [FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

    ## What is the purpose of the change
    
    Ports the {{CancelingTestBase}} to use {{MiniClusterResource}}.
    
    ## Verifying this change
    
    Run `MapCancelingTestBase` with `flip6` profile enabled/disabled.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 8703_canceling

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5664.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5664
    
----
commit 57338df4819b2324f7ede2b131f81d83bc9096b2
Author: zentol <ch...@...>
Date:   2018-02-26T14:36:37Z

    [FLINK-8703][tests] Port CancelingTestBase to MiniClusterResource

commit 22d4a2f02c256eb41a1684a5766a1dd53dc9351d
Author: zentol <ch...@...>
Date:   2018-02-28T12:43:42Z

    [hotfix][tests] Properly disable JoinCancelingITCase

----


---

[GitHub] flink issue #5664: [FLINK-8703][tests] Port CancelingTestBase to MiniCluster...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5664
  
    merging.


---

[GitHub] flink issue #5664: [FLINK-8703][tests] Port CancelingTestBase to MiniCluster...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5664
  
    Interesting, there is a aborted test run with a core dump.


---

[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5664#discussion_r177794734
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java ---
    @@ -18,160 +18,111 @@
     
     package org.apache.flink.test.cancelling;
     
    +import org.apache.flink.api.common.JobSubmissionResult;
     import org.apache.flink.api.common.Plan;
    +import org.apache.flink.client.program.ClusterClient;
     import org.apache.flink.configuration.AkkaOptions;
    -import org.apache.flink.configuration.ConfigConstants;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.configuration.CoreOptions;
     import org.apache.flink.configuration.TaskManagerOptions;
     import org.apache.flink.optimizer.DataStatistics;
     import org.apache.flink.optimizer.Optimizer;
     import org.apache.flink.optimizer.plan.OptimizedPlan;
     import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
    -import org.apache.flink.runtime.instance.ActorGateway;
     import org.apache.flink.runtime.jobgraph.JobGraph;
     import org.apache.flink.runtime.jobgraph.JobStatus;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.runtime.testingUtils.TestingUtils;
    -import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
    +import org.apache.flink.test.util.MiniClusterResource;
     import org.apache.flink.util.TestLogger;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.junit.After;
     import org.junit.Assert;
    -import org.junit.Before;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    +import org.junit.ClassRule;
     
     import java.util.concurrent.TimeUnit;
     
    -import scala.concurrent.Await;
    -import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
     import scala.concurrent.duration.FiniteDuration;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
    -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
    -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
    -
     /**
      * Base class for testing job cancellation.
      */
     public abstract class CancelingTestBase extends TestLogger {
     
    -	private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
    -
     	private static final int MINIMUM_HEAP_SIZE_MB = 192;
     
    -	/**
    -	 * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
    -	 * is canceled), starting from the point in time when the cancel request is issued.
    -	 */
    -	private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
    -
    -	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
    +	protected static final int PARALLELISM = 4;
     
     	// --------------------------------------------------------------------------------------------
     
    -	protected LocalFlinkMiniCluster executor;
    -
    -	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
    +	@ClassRule
    +	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
    +		new MiniClusterResource.MiniClusterResourceConfiguration(
    +			getConfiguration(),
    +			2,
    --- End diff --
    
    Why do we start a mini cluster with 2 TMs and 4 slots per TM? Wouldn't a single TM be sufficient?


---

[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5664#discussion_r177801072
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ---
    @@ -30,19 +30,18 @@
     import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
     import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
     
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
     /**
      * Test job cancellation from within a JoinFunction.
      */
    +@Ignore("Takes too long.")
    --- End diff --
    
    No, I also didn't check how long it actually takes.


---

[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5664


---

[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5664#discussion_r177794320
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/JoinCancelingITCase.java ---
    @@ -30,19 +30,18 @@
     import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
     import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
     
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
     /**
      * Test job cancellation from within a JoinFunction.
      */
    +@Ignore("Takes too long.")
    --- End diff --
    
    Do you know what's taking so long?


---

[GitHub] flink pull request #5664: [FLINK-8703][tests] Port CancelingTestBase to Mini...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5664#discussion_r177800863
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java ---
    @@ -18,160 +18,111 @@
     
     package org.apache.flink.test.cancelling;
     
    +import org.apache.flink.api.common.JobSubmissionResult;
     import org.apache.flink.api.common.Plan;
    +import org.apache.flink.client.program.ClusterClient;
     import org.apache.flink.configuration.AkkaOptions;
    -import org.apache.flink.configuration.ConfigConstants;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.configuration.CoreOptions;
     import org.apache.flink.configuration.TaskManagerOptions;
     import org.apache.flink.optimizer.DataStatistics;
     import org.apache.flink.optimizer.Optimizer;
     import org.apache.flink.optimizer.plan.OptimizedPlan;
     import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
    -import org.apache.flink.runtime.instance.ActorGateway;
     import org.apache.flink.runtime.jobgraph.JobGraph;
     import org.apache.flink.runtime.jobgraph.JobStatus;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.runtime.testingUtils.TestingUtils;
    -import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
    +import org.apache.flink.test.util.MiniClusterResource;
     import org.apache.flink.util.TestLogger;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.junit.After;
     import org.junit.Assert;
    -import org.junit.Before;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    +import org.junit.ClassRule;
     
     import java.util.concurrent.TimeUnit;
     
    -import scala.concurrent.Await;
    -import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
     import scala.concurrent.duration.FiniteDuration;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
    -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
    -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
    -
     /**
      * Base class for testing job cancellation.
      */
     public abstract class CancelingTestBase extends TestLogger {
     
    -	private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
    -
     	private static final int MINIMUM_HEAP_SIZE_MB = 192;
     
    -	/**
    -	 * Defines the number of seconds after which an issued cancel request is expected to have taken effect (i.e. the job
    -	 * is canceled), starting from the point in time when the cancel request is issued.
    -	 */
    -	private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10 * 1000;
    -
    -	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
    +	protected static final int PARALLELISM = 4;
     
     	// --------------------------------------------------------------------------------------------
     
    -	protected LocalFlinkMiniCluster executor;
    -
    -	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
    +	@ClassRule
    +	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
    +		new MiniClusterResource.MiniClusterResourceConfiguration(
    +			getConfiguration(),
    +			2,
    --- End diff --
    
    Possibly, i just copied the existing setup.


---