You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/24 15:15:57 UTC

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4896

    [FLINK-7909] Unify Flink test bases

    ## What is the purpose of the change
    
    Introduce a MiniClusterResource which is used by the AbstractTestBase to start
    and shut down a FlinkMiniCluster. Additionally, this resource registers the proper
    Stream- and ExecutionEnvironment which is now the only way for tests to start
    jobs. This change will thus allow to centrally control which FlinkCluster will
    be started for all test bases.
    
    The AbstractTestBase fully subsumes the functionality of the
    StreamingMultipleProgramsTestBase since it now is the most general test base
    for streaming and batch jobs. As a consequence, we can safely remove the
    StreamingMultipleProgramsTestBase and let all corresponding tests extend from
    AbstractTestBase.
    
    ## Brief change log
    
    - Introduce `MiniClusterResource` which starts a `FlinkMiniCluster` and registers a `TestEnvironment` and a `TestStreamEnvironment`
    - Add `MiniClusterResource` as a `ClassRule` to `AbstractTestBase`
    - Remove `StreamingMultipleProgramsTestBase`
    
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixAbstractTestBase

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4896
    
----
commit f52a06a133d47ff99b65c68b858f54ad18415d65
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-23T15:15:07Z

    [hotfix] [tests] Remove AbstractTestBase from CsvOutputFormatITCase and TextOutputFormatITCase

commit ef9cb2be617535082b37fcd69140176b567e54ed
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-24T09:32:05Z

    [FLINK-7909] Unify Flink test bases
    
    Introduce a MiniClusterResource which is used by the AbstractTestBase to start
    and shut down a FlinkMiniCluster. Additionally, this resource registers the proper
    Stream- and ExecutionEnvironment which is now the only way for tests to start
    jobs. This change will thus allow to centrally control which FlinkCluster will
    be started for all test bases.

commit 105ef865b135bdc75f39ab483f979e6bf730e7fe
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-24T14:20:15Z

    [FLINK-7909] Replace StreamingMultipleProgramsTestBase by AbstractTestBase
    
    The AbstractTestBase fully subsumes the functionality of the
    StreamingMultipleProgramsTestBase since it now is the most general test base
    for streaming and batch jobs. As a consequence, we can safely remove the
    StreamingMultipleProgramsTestBase and let all corresponding tests extend from
    AbstractTestBase.

----


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158265225
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---
    @@ -19,81 +19,61 @@
     package org.apache.flink.test.util;
     
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.util.FileUtils;
     
     import org.junit.ClassRule;
     import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.io.File;
     import java.io.IOException;
    -import java.util.Objects;
    -
    -import scala.concurrent.duration.FiniteDuration;
     
     /**
    - * A base class for tests that run test programs in a Flink mini cluster.
    + * Base class for unit tests that run multiple tests and want to reuse the same
    + * Flink cluster. This saves a significant amount of time, since the startup and
    + * shutdown of the Flink clusters (including actor systems, etc) usually dominates
    + * the execution of the actual tests.
    + *
    + * <p>To write a unit test against this test base, simply extend it and add
    + * one or more regular test methods and retrieve the StreamExecutionEnvironment from
    + * the context:
    + *
    + * <pre>
    + *   {@literal @}Test
    + *   public void someTest() {
    + *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + *   {@literal @}Test
    + *   public void anotherTest() {
    + *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + * </pre>
      */
     public abstract class AbstractTestBase extends TestBaseUtils {
     
    -	/** Configuration to start the testing cluster with. */
    -	protected final Configuration config;
    +	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
    --- End diff --
    
    Maybe `protected final Logger log = LoggerFactory.getLogger(getClass());` so that the class name of the implementation is logged.


---

[GitHub] flink issue #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4896
  
    Thanks for your review @GJL. I've addressed all your comments. I rebased this PR onto #4890. Once Travis gives green light, I'll merge it.


---

[GitHub] flink issue #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4896
  
    It compiles now @zentol.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158257284
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---
    @@ -19,81 +19,61 @@
     package org.apache.flink.test.util;
     
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.util.FileUtils;
     
     import org.junit.ClassRule;
     import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.io.File;
     import java.io.IOException;
    -import java.util.Objects;
    -
    -import scala.concurrent.duration.FiniteDuration;
     
     /**
    - * A base class for tests that run test programs in a Flink mini cluster.
    + * Base class for unit tests that run multiple tests and want to reuse the same
    + * Flink cluster. This saves a significant amount of time, since the startup and
    + * shutdown of the Flink clusters (including actor systems, etc) usually dominates
    + * the execution of the actual tests.
    + *
    + * <p>To write a unit test against this test base, simply extend it and add
    + * one or more regular test methods and retrieve the StreamExecutionEnvironment from
    + * the context:
    + *
    + * <pre>
    + *   {@literal @}Test
    + *   public void someTest() {
    + *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + *   {@literal @}Test
    + *   public void anotherTest() {
    + *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + * </pre>
      */
     public abstract class AbstractTestBase extends TestBaseUtils {
     
    -	/** Configuration to start the testing cluster with. */
    -	protected final Configuration config;
    +	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
     
    -	private final FiniteDuration timeout;
    +	private static final int DEFAULT_PARALLELISM = 4;
     
    -	protected int taskManagerNumSlots = 1;
    +	protected static final Configuration MINICLUSTER_CONFIGURATION = new Configuration();
     
    -	protected int numTaskManagers = 1;
    +	@ClassRule
    --- End diff --
    
    `miniClusterResource` will be initialized before `MINICLUSTER_CONFIGURATION` can be modified by a `@BeforeClass` method, i.e., for some cases the configuration cannot be supplied in time.


---

[GitHub] flink issue #4896: [FLINK-7909] Unify Flink test bases

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4896
  
    Do you intend to merge this for 1.4? (I would prefer merging it after the fork).


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160172988
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java ---
    @@ -66,47 +47,34 @@ protected void postSubmit() throws Exception {}
     
     	@Test
     	public void testJob() throws Exception {
    +		// pre-submit
     		try {
    -			// pre-submit
    -			try {
    -				preSubmit();
    -			}
    -			catch (Exception e) {
    -				System.err.println(e.getMessage());
    -				e.printStackTrace();
    -				fail("Pre-submit work caused an error: " + e.getMessage());
    -			}
    -
    -			// prepare the test environment
    -			startCluster();
    -
    -			TestStreamEnvironment.setAsContext(this.executor, getParallelism());
    +			preSubmit();
    +		}
    +		catch (Exception e) {
    +			System.err.println(e.getMessage());
    +			e.printStackTrace();
    +			fail("Pre-submit work caused an error: " + e.getMessage());
    --- End diff --
    
    Will rework it.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4896


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160158137
  
    --- Diff: flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala ---
    @@ -23,8 +23,7 @@ import java.io.File
     import org.apache.commons.io.FileUtils
     import org.apache.flink.core.fs.FileSystem.WriteMode
     import org.apache.flink.streaming.api.TimeCharacteristic
    -import org.apache.flink.streaming.api.scala._
    -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    --- End diff --
    
    Will change it.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160157769
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---
    @@ -19,81 +19,61 @@
     package org.apache.flink.test.util;
     
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.util.FileUtils;
     
     import org.junit.ClassRule;
     import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.io.File;
     import java.io.IOException;
    -import java.util.Objects;
    -
    -import scala.concurrent.duration.FiniteDuration;
     
     /**
    - * A base class for tests that run test programs in a Flink mini cluster.
    + * Base class for unit tests that run multiple tests and want to reuse the same
    + * Flink cluster. This saves a significant amount of time, since the startup and
    + * shutdown of the Flink clusters (including actor systems, etc) usually dominates
    + * the execution of the actual tests.
    + *
    + * <p>To write a unit test against this test base, simply extend it and add
    + * one or more regular test methods and retrieve the StreamExecutionEnvironment from
    + * the context:
    + *
    + * <pre>
    + *   {@literal @}Test
    + *   public void someTest() {
    + *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + *   {@literal @}Test
    + *   public void anotherTest() {
    + *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + * </pre>
      */
     public abstract class AbstractTestBase extends TestBaseUtils {
     
    -	/** Configuration to start the testing cluster with. */
    -	protected final Configuration config;
    +	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
     
    -	private final FiniteDuration timeout;
    +	private static final int DEFAULT_PARALLELISM = 4;
     
    -	protected int taskManagerNumSlots = 1;
    +	protected static final Configuration MINICLUSTER_CONFIGURATION = new Configuration();
     
    -	protected int numTaskManagers = 1;
    +	@ClassRule
    --- End diff --
    
    Will remove the `MINICLUSTER_CONFIGURATION` completely.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158254745
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---
    @@ -19,81 +19,61 @@
     package org.apache.flink.test.util;
     
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.util.FileUtils;
     
     import org.junit.ClassRule;
     import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.io.File;
     import java.io.IOException;
    -import java.util.Objects;
    -
    -import scala.concurrent.duration.FiniteDuration;
     
     /**
    - * A base class for tests that run test programs in a Flink mini cluster.
    + * Base class for unit tests that run multiple tests and want to reuse the same
    + * Flink cluster. This saves a significant amount of time, since the startup and
    + * shutdown of the Flink clusters (including actor systems, etc) usually dominates
    + * the execution of the actual tests.
    + *
    + * <p>To write a unit test against this test base, simply extend it and add
    + * one or more regular test methods and retrieve the StreamExecutionEnvironment from
    + * the context:
    + *
    + * <pre>
    + *   {@literal @}Test
    + *   public void someTest() {
    + *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + *   {@literal @}Test
    + *   public void anotherTest() {
    + *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + * </pre>
      */
     public abstract class AbstractTestBase extends TestBaseUtils {
     
    -	/** Configuration to start the testing cluster with. */
    -	protected final Configuration config;
    +	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
     
    -	private final FiniteDuration timeout;
    +	private static final int DEFAULT_PARALLELISM = 4;
     
    -	protected int taskManagerNumSlots = 1;
    +	protected static final Configuration MINICLUSTER_CONFIGURATION = new Configuration();
    --- End diff --
    
    I think it's dangerous to have mutable global state. 
    
    For example if I have the following two tests:
    ```
    public class TestTest extends AbstractTestBase {
    
    	@BeforeClass
    	public static void setUp() throws Exception {
    		MINICLUSTER_CONFIGURATION.setString("foo", "bar");
    	}
    
    	@Test
    	public void name() throws Exception {
    		System.out.println(MINICLUSTER_CONFIGURATION);
    	}
    
    }
    ```
    
    ```
    public class TestTest2 extends AbstractTestBase {
    
    	@Test
    	public void name() throws Exception {
    		System.out.println(MINICLUSTER_CONFIGURATION);
    	}
    
    }
    ```
    and run them both from IntelliJ, `{foo=bar}` is printed twice.
    
    `MINICLUSTER_CONFIGURATION` is never cleaned up.


---

[GitHub] flink issue #4896: [FLINK-7909] Unify Flink test bases

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4896
  
    Doesn't compile:
    ```[INFO] flink-hadoop-compatibility ......................... FAILURE [  5.449 s]
    [INFO] flink-avro ......................................... SKIPPED
    [INFO] flink-tests ........................................ SKIPPED
    [INFO] flink-streaming-scala .............................. SKIPPED
    [INFO] flink-scala-shell .................................. SKIPPED
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD FAILURE
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 03:32 min
    [INFO] Finished at: 2017-10-24T23:22:39+00:00
    [INFO] Final Memory: 144M/870M
    [INFO] ------------------------------------------------------------------------
    [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:testCompile (default-testCompile) on project flink-hadoop-compatibility_2.11: Compilation failure: Compilation failure:
    [ERROR] /home/travis/build/apache/flink/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java:[49,21] cannot find symbol
    [ERROR] symbol: method setParallelism(int)
    [ERROR] /home/travis/build/apache/flink/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java:[61,33] cannot find symbol
    [ERROR] symbol:   variable config
    [ERROR] location: class org.apache.flink.test.hadoopcompatibility.mapred.HadoopIOFormatsITCase
    [ERROR] /home/travis/build/apache/flink/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java:[68,17] constructor JavaProgramTestBase in class org.apache.flink.test.util.JavaProgramTestBase cannot be applied to given types;
    [ERROR] required: no arguments
    [ERROR] found: org.apache.flink.configuration.Configuration
    [ERROR] reason: actual and formal argument lists differ in length
    [ERROR] /home/travis/build/apache/flink/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java:[47,21] cannot find symbol
    [ERROR] symbol: method setParallelism(int)
    ```


---

[GitHub] flink issue #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4896
  
    Yes, this can wait until after the fork. Moreover, I will do what @StephanEwen suggested and count the number of executed tests before and after of this change to make sure that we don't introduce regression wrt the actually executed tests.


---

[GitHub] flink issue #4896: [FLINK-7909] Unify Flink test bases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4896
  
    The diffs looks good, but what I cannot judge in a final manner is whether some tests now get not executed any more (accidentally).
    
    What would be good is to take the Travis output from the profiles that run tests in `flink-runtime` and `flink-tests` and compare the number of executed tests (maven prints that in the "test" and "verify" summary). If they are still the same (or differ by a number explained through the refactoring), then +1 to merge


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160171312
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---
    @@ -19,81 +19,61 @@
     package org.apache.flink.test.util;
     
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.util.FileUtils;
     
     import org.junit.ClassRule;
     import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.io.File;
     import java.io.IOException;
    -import java.util.Objects;
    -
    -import scala.concurrent.duration.FiniteDuration;
     
     /**
    - * A base class for tests that run test programs in a Flink mini cluster.
    + * Base class for unit tests that run multiple tests and want to reuse the same
    + * Flink cluster. This saves a significant amount of time, since the startup and
    + * shutdown of the Flink clusters (including actor systems, etc) usually dominates
    + * the execution of the actual tests.
    + *
    + * <p>To write a unit test against this test base, simply extend it and add
    + * one or more regular test methods and retrieve the StreamExecutionEnvironment from
    + * the context:
    + *
    + * <pre>
    + *   {@literal @}Test
    + *   public void someTest() {
    + *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + *   {@literal @}Test
    + *   public void anotherTest() {
    + *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + * </pre>
      */
     public abstract class AbstractTestBase extends TestBaseUtils {
     
    -	/** Configuration to start the testing cluster with. */
    -	protected final Configuration config;
    +	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
    --- End diff --
    
    Good idea. Will change it.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160171032
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---
    @@ -18,8 +18,8 @@
     
     package org.apache.flink.table.runtime.stream
     
    -import java.math.BigDecimal
     import java.lang.{Integer => JInt, Long => JLong}
    +import java.math.BigDecimal
    --- End diff --
    
    I think for Scala, we didn't properly specify them.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158259995
  
    --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---
    @@ -55,6 +55,7 @@ public static void main(String[] args) throws Exception {
     		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     		env.getConfig().setGlobalJobParameters(params);
    +		env.setParallelism(1);
    --- End diff --
    
    Is this strictly needed? It's not a Unit or ITCase and the example seems to work without this line.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158259149
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---
    @@ -215,23 +211,15 @@ private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
     			dfs.mkdirs(new Path("/flink/checkpoints"));
     			dfs.mkdirs(new Path("/flink/recovery"));
     
    -			org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
    -
    -			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
    -			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
    -			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
    -			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
    -			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    -			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
    -			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
    -			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
    -			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
    -
    -			SecureTestEnvironment.populateFlinkSecureConfigurations(config);
    -
    -			cluster = TestBaseUtils.startCluster(config, false);
    -			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
    +			MINICLUSTER_CONFIGURATION.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
    --- End diff --
    
    Because of this:
    ```
    private static void skipIfHadoopVersionIsNotAppropriate() {
    		// Skips all tests if the Hadoop version doesn't match
    		String hadoopVersionString = VersionInfo.getVersion();
    		String[] split = hadoopVersionString.split("\\.");
    		if (split.length != 3) {
    			throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString);
    		}
    		Assume.assumeTrue(
    			// check whether we're running Hadoop version >= 3.x.x
    			Integer.parseInt(split[0]) >= 3
    		);
    	}
    ```
    I assume that the test will never run.
    
    I wonder if the test has ever worked correctly.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158260663
  
    --- Diff: flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala ---
    @@ -23,8 +23,7 @@ import java.io.File
     import org.apache.commons.io.FileUtils
     import org.apache.flink.core.fs.FileSystem.WriteMode
     import org.apache.flink.streaming.api.TimeCharacteristic
    -import org.apache.flink.streaming.api.scala._
    -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    --- End diff --
    
    nit: The import looks strange. I think
    
    ```
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    ```
    is enough.



---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160165030
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---
    @@ -215,23 +211,15 @@ private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
     			dfs.mkdirs(new Path("/flink/checkpoints"));
     			dfs.mkdirs(new Path("/flink/recovery"));
     
    -			org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
    -
    -			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
    -			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
    -			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
    -			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
    -			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    -			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
    -			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
    -			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
    -			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
    -
    -			SecureTestEnvironment.populateFlinkSecureConfigurations(config);
    -
    -			cluster = TestBaseUtils.startCluster(config, false);
    -			TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
    +			MINICLUSTER_CONFIGURATION.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
    --- End diff --
    
    True, will rework the way we instantiate the `MiniClusterResource` in this test case.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160170300
  
    --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---
    @@ -55,6 +55,7 @@ public static void main(String[] args) throws Exception {
     		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     		env.getConfig().setGlobalJobParameters(params);
    +		env.setParallelism(1);
    --- End diff --
    
    Yes it's because of `TopSpeedWindowingExampleITCase`. However, it's not very nice and I decided to rework the `TopSpeedWindowingExampleITCase` instead.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158261437
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---
    @@ -18,8 +18,8 @@
     
     package org.apache.flink.table.runtime.stream
     
    -import java.math.BigDecimal
     import java.lang.{Integer => JInt, Long => JLong}
    +import java.math.BigDecimal
    --- End diff --
    
    nit: Are we following import orders for Scala as described here for Java: http://flink.apache.org/contribute-code.html#imports?


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r158264752
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java ---
    @@ -66,47 +47,34 @@ protected void postSubmit() throws Exception {}
     
     	@Test
     	public void testJob() throws Exception {
    +		// pre-submit
     		try {
    -			// pre-submit
    -			try {
    -				preSubmit();
    -			}
    -			catch (Exception e) {
    -				System.err.println(e.getMessage());
    -				e.printStackTrace();
    -				fail("Pre-submit work caused an error: " + e.getMessage());
    -			}
    -
    -			// prepare the test environment
    -			startCluster();
    -
    -			TestStreamEnvironment.setAsContext(this.executor, getParallelism());
    +			preSubmit();
    +		}
    +		catch (Exception e) {
    +			System.err.println(e.getMessage());
    +			e.printStackTrace();
    +			fail("Pre-submit work caused an error: " + e.getMessage());
    --- End diff --
    
    nit: The test should fail on exception anyways. If you want to leave it to keep the diff smaller, it's also ok.


---

[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4896#discussion_r160156245
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---
    @@ -19,81 +19,61 @@
     package org.apache.flink.test.util;
     
     import org.apache.flink.configuration.Configuration;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
     import org.apache.flink.util.FileUtils;
     
     import org.junit.ClassRule;
     import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.io.File;
     import java.io.IOException;
    -import java.util.Objects;
    -
    -import scala.concurrent.duration.FiniteDuration;
     
     /**
    - * A base class for tests that run test programs in a Flink mini cluster.
    + * Base class for unit tests that run multiple tests and want to reuse the same
    + * Flink cluster. This saves a significant amount of time, since the startup and
    + * shutdown of the Flink clusters (including actor systems, etc) usually dominates
    + * the execution of the actual tests.
    + *
    + * <p>To write a unit test against this test base, simply extend it and add
    + * one or more regular test methods and retrieve the StreamExecutionEnvironment from
    + * the context:
    + *
    + * <pre>
    + *   {@literal @}Test
    + *   public void someTest() {
    + *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + *   {@literal @}Test
    + *   public void anotherTest() {
    + *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    + *       // test code
    + *       env.execute();
    + *   }
    + *
    + * </pre>
      */
     public abstract class AbstractTestBase extends TestBaseUtils {
     
    -	/** Configuration to start the testing cluster with. */
    -	protected final Configuration config;
    +	protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
     
    -	private final FiniteDuration timeout;
    +	private static final int DEFAULT_PARALLELISM = 4;
     
    -	protected int taskManagerNumSlots = 1;
    +	protected static final Configuration MINICLUSTER_CONFIGURATION = new Configuration();
    --- End diff --
    
    You're right. This is bad and I'll fix it.


---