You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/09 08:14:49 UTC
[6/7] flink git commit: [FLINK-7909] Replace
StreamingMultipleProgramsTestBase by AbstractTestBase
[FLINK-7909] Replace StreamingMultipleProgramsTestBase by AbstractTestBase
The AbstractTestBase fully subsumes the functionality of the
StreamingMultipleProgramsTestBase since it now is the most general test base
for streaming and batch jobs. As a consequence, we can safely remove the
StreamingMultipleProgramsTestBase and let all corresponding tests extend from
AbstractTestBase.
This closes #4896.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b90210e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b90210e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b90210e3
Branch: refs/heads/master
Commit: b90210e3712a54ad85a33dfc308a03e0c4a2a250
Parents: 3c5c832
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 24 16:20:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 9 08:05:51 2018 +0100
----------------------------------------------------------------------
.../ElasticsearchSinkTestBase.java | 8 ++-
.../connectors/fs/RollingSinkITCase.java | 22 +++++--
.../connectors/fs/RollingSinkSecuredITCase.java | 62 +++++++++----------
.../mapred/HadoopIOFormatsITCase.java | 22 +++----
.../mapred/HadoopMapredITCase.java | 1 -
.../mapreduce/HadoopInputOutputITCase.java | 1 -
.../apache/flink/storm/split/SplitITCase.java | 4 +-
.../examples/windowing/TopSpeedWindowing.java | 1 -
.../streaming/test/StreamingExamplesITCase.java | 4 +-
.../TopSpeedWindowingExampleITCase.java | 50 +++++++++------
.../socket/SocketWindowWordCountITCase.java | 4 +-
.../examples/StreamingExamplesITCase.scala | 22 +++----
.../java/org/apache/flink/cep/CEPITCase.java | 4 +-
.../table/runtime/stream/sql/JavaSqlITCase.java | 4 +-
.../flink/table/api/stream/ExplainTest.scala | 4 +-
.../UnsupportedOpsValidationTest.scala | 4 +-
.../runtime/stream/TimeAttributesITCase.scala | 10 +--
.../runtime/stream/sql/TableSourceITCase.scala | 4 +-
.../table/runtime/stream/table/CalcITCase.scala | 9 ++-
.../runtime/stream/table/CorrelateITCase.scala | 4 +-
.../stream/table/GroupWindowITCase.scala | 10 +--
.../stream/table/SetOperatorsITCase.scala | 4 +-
.../runtime/stream/table/TableSinkITCase.scala | 7 +--
.../stream/table/TableSourceITCase.scala | 6 +-
.../utils/StreamingWithStateTestBase.scala | 4 +-
.../flink/streaming/api/DataStreamTest.java | 3 +-
.../scala/api/CsvOutputFormatITCase.java | 4 +-
.../scala/api/TextOutputFormatITCase.java | 4 +-
.../streaming/api/scala/CoGroupJoinITCase.scala | 7 +--
.../streaming/api/scala/DataStreamTest.scala | 28 ++++++---
.../streaming/api/scala/SideOutputITCase.scala | 4 +-
.../api/scala/TimeWindowTranslationTest.scala | 4 +-
.../streaming/api/scala/WindowFoldITCase.scala | 6 +-
.../api/scala/WindowReduceITCase.scala | 6 +-
.../util/StreamingMultipleProgramsTestBase.java | 64 --------------------
.../flink/test/util/AbstractTestBase.java | 14 ++---
.../flink/test/util/MiniClusterResource.java | 11 ++--
.../test/util/MultipleProgramsTestBase.java | 48 ++++-----------
.../apache/flink/test/util/TestBaseUtils.java | 2 +-
.../CoStreamCheckpointingITCase.java | 4 +-
.../StreamCheckpointNotifierITCase.java | 4 +-
.../test/state/ManualWindowSpeedITCase.java | 4 +-
.../streaming/api/StreamingOperatorsITCase.java | 4 +-
.../runtime/ChainedRuntimeContextITCase.java | 4 +-
.../streaming/runtime/CoGroupJoinITCase.java | 4 +-
.../test/streaming/runtime/CoStreamITCase.java | 4 +-
.../streaming/runtime/DataStreamPojoITCase.java | 4 +-
.../streaming/runtime/DirectedOutputITCase.java | 4 +-
.../test/streaming/runtime/IterateITCase.java | 26 ++++----
.../streaming/runtime/OutputSplitterITCase.java | 4 +-
.../streaming/runtime/PartitionerITCase.java | 4 +-
.../streaming/runtime/SelfConnectionITCase.java | 4 +-
.../streaming/runtime/SideOutputITCase.java | 4 +-
.../streaming/runtime/StateBackendITCase.java | 4 +-
.../runtime/StreamTaskTimerITCase.java | 4 +-
.../streaming/runtime/WindowFoldITCase.java | 4 +-
.../sessionwindows/SessionWindowITCase.java | 4 +-
57 files changed, 253 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 297bc5d..b90e8ed 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.InstantiationUtil;
import org.elasticsearch.client.Client;
@@ -31,6 +31,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Collections;
@@ -44,7 +46,9 @@ import static org.junit.Assert.fail;
/**
* Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations.
*/
-public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase {
+public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
protected static final String CLUSTER_NAME = "test-cluster";
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 10d1846..78f643f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -32,9 +32,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
@@ -83,13 +84,14 @@ import java.util.Map;
* @deprecated should be removed with the {@link RollingSink}.
*/
@Deprecated
-public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
+public class RollingSinkITCase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class);
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
+ protected static MiniClusterResource miniClusterResource;
protected static MiniDFSCluster hdfsCluster;
protected static org.apache.hadoop.fs.FileSystem dfs;
protected static String hdfsURI;
@@ -98,7 +100,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
protected static File dataDir;
@BeforeClass
- public static void createHDFS() throws IOException {
+ public static void setup() throws Exception {
LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
@@ -113,12 +115,22 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
hdfsURI = "hdfs://"
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ "/";
+
+ miniClusterResource = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ new org.apache.flink.configuration.Configuration(),
+ 1,
+ 4));
}
@AfterClass
- public static void destroyHDFS() {
+ public static void teardown() throws Exception {
LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
hdfsCluster.shutdown();
+
+ if (miniClusterResource != null) {
+ miniClusterResource.after();
+ }
}
/**
@@ -926,6 +938,8 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
+ private static final long serialVersionUID = 761584896826819477L;
+
private String key;
private String expect;
public StreamWriterWithConfigCheck(String key, String expect) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 7595ac0..b76d087 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestingSecurityContext;
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -94,19 +93,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
* and out-of-order sequence for secure cluster
*/
@BeforeClass
- public static void setup() throws Exception {}
-
- @AfterClass
- public static void teardown() throws Exception {}
-
- @BeforeClass
- public static void createHDFS() throws IOException {}
-
- @AfterClass
- public static void destroyHDFS() {}
-
- @BeforeClass
- public static void startSecureCluster() throws Exception {
+ public static void setup() throws Exception {
skipIfHadoopVersionIsNotAppropriate();
@@ -158,20 +145,29 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ "/";
- startSecureFlinkClusterWithRecoveryModeEnabled();
+ Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();
+
+ miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(
+ configuration,
+ 1,
+ 4));
+
+ miniClusterResource.before();
}
@AfterClass
- public static void teardownSecureCluster() throws Exception {
+ public static void teardown() throws Exception {
LOG.info("tearing down secure cluster environment");
- TestStreamEnvironment.unsetAsContext();
- stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
+ if (miniClusterResource != null) {
+ miniClusterResource.after();
+ miniClusterResource = null;
+ }
+
SecureTestEnvironment.cleanup();
}
@@ -208,30 +204,26 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
}
- private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+ private static Configuration startSecureFlinkClusterWithRecoveryModeEnabled() {
try {
LOG.info("Starting Flink and ZK in secure mode");
dfs.mkdirs(new Path("/flink/checkpoints"));
dfs.mkdirs(new Path("/flink/recovery"));
- org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
-
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
- config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
- config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- config.setString(CoreOptions.STATE_BACKEND, "filesystem");
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
- config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
+ final Configuration result = new Configuration();
- SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+ result.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+ result.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+ result.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ result.setString(CoreOptions.STATE_BACKEND, "filesystem");
+ result.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
+ result.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
+ result.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
- cluster = TestBaseUtils.startCluster(config, false);
- TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
+ SecureTestEnvironment.populateFlinkSecureConfigurations(result);
+ return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
index 46102a2..753d813 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -23,9 +23,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.fs.FileSystem;
@@ -44,11 +42,9 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
/**
* Integration tests for Hadoop IO formats.
@@ -58,14 +54,14 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
private static final int NUM_PROGRAMS = 2;
- private int curProgId = config.getInteger("ProgramId", -1);
+ private final int curProgId;
private String[] resultPath;
private String[] expectedResult;
private String sequenceFileInPath;
private String sequenceFileInPathNull;
- public HadoopIOFormatsITCase(Configuration config) {
- super(config);
+ public HadoopIOFormatsITCase(int curProgId) {
+ this.curProgId = curProgId;
}
@Before
@@ -143,17 +139,15 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
}
@Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+ public static Collection<Object[]> getConfigurations() {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ Collection<Object[]> programIds = new ArrayList<>(NUM_PROGRAMS);
for (int i = 1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+ programIds.add(new Object[]{i});
}
- return TestBaseUtils.toParameterList(tConfigs);
+ return programIds;
}
private static class HadoopIOFormatPrograms {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index 145eaaa..db2ad8e 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -44,7 +44,6 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
- this.setParallelism(4);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index a23a50d..783a5a6 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -46,7 +46,6 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
- this.setParallelism(4);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
index d53493c..7152cf2 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.storm.split;
import org.apache.flink.storm.split.SpoutSplitExample.Enrich;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.junit.After;
import org.junit.Assert;
@@ -32,7 +32,7 @@ import java.io.IOException;
/**
* Tests for split examples.
*/
-public class SplitITCase extends StreamingMultipleProgramsTestBase {
+public class SplitITCase extends AbstractTestBase {
private String output;
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 7543bab..ee06cd4 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -55,7 +55,6 @@ public class TopSpeedWindowing {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setGlobalJobParameters(params);
- env.setParallelism(1);
@SuppressWarnings({"rawtypes", "serial"})
DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 4c47d59..cfe899e 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonDa
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
import org.apache.flink.streaming.test.examples.join.WindowJoinData;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
@@ -40,7 +40,7 @@ import java.io.File;
/**
* Integration test for streaming programs in Java examples.
*/
-public class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+public class StreamingExamplesITCase extends AbstractTestBase {
@Test
public void testIterateExample() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index c2f3164..320dd5f 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,33 +17,47 @@
package org.apache.flink.streaming.test.examples.windowing;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
/**
* Tests for {@link TopSpeedWindowing}.
*/
-public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+public class TopSpeedWindowingExampleITCase extends TestLogger {
- protected String textPath;
- protected String resultPath;
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
- resultPath = getTempDirPath("result");
- }
+ @ClassRule
+ public static MiniClusterResource miniClusterResource = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ new Configuration(),
+ 1,
+ 1));
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
- }
+ @Test
+ public void testTopSpeedWindowingExampleITCase() throws Exception {
+ File inputFile = temporaryFolder.newFile();
+ FileUtils.writeFileUtf8(inputFile, TopSpeedWindowingExampleData.CAR_DATA);
+
+ final String resultPath = temporaryFolder.newFolder().toURI().toString();
- @Override
- protected void testProgram() throws Exception {
- TopSpeedWindowing.main(new String[]{
- "--input", textPath,
- "--output", resultPath});
+ TopSpeedWindowing.main(new String[] {
+ "--input", inputFile.getAbsolutePath(),
+ "--output", resultPath});
+
+ compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index a09b22e..91ee9bf 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.test.socket;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.examples.socket.SocketWindowWordCount;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Test;
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
/**
* Tests for {@link SocketWindowWordCount}.
*/
-public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBase {
+public class SocketWindowWordCountITCase extends AbstractTestBase {
@Test
public void testJavaProgram() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index 24d1444..7407294 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -24,7 +24,6 @@ import org.apache.commons.io.FileUtils
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData
import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
@@ -37,7 +36,6 @@ import org.apache.flink.streaming.scala.examples.twitter.TwitterExample
import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, WindowWordCount}
import org.apache.flink.streaming.scala.examples.wordcount.WordCount
import org.apache.flink.streaming.test.examples.join.WindowJoinData
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.test.testdata.WordCountData
import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
import org.junit.Test
@@ -45,12 +43,12 @@ import org.junit.Test
/**
* Integration test for streaming programs in Scala examples.
*/
-class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+class StreamingExamplesITCase extends AbstractTestBase {
@Test
def testIterateExample(): Unit = {
- val inputPath = AbstractTestBase.createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
- val resultPath = AbstractTestBase.getTempDirPath("result")
+ val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
+ val resultPath = getTempDirPath("result")
// the example is inherently non-deterministic. The iteration timeout of 5000 ms
// is frequently not enough to make the test run stable on CI infrastructure
@@ -99,14 +97,14 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
@Test
def testIncrementalLearningSkeleton(): Unit = {
- val resultPath = AbstractTestBase.getTempDirPath("result")
+ val resultPath = getTempDirPath("result")
IncrementalLearningSkeleton.main(Array("--output", resultPath))
TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath)
}
@Test
def testTwitterExample(): Unit = {
- val resultPath = AbstractTestBase.getTempDirPath("result")
+ val resultPath = getTempDirPath("result")
TwitterExample.main(Array("--output", resultPath))
TestBaseUtils.compareResultsByLinesInMemory(
TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
@@ -115,7 +113,7 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
@Test
def testSessionWindowing(): Unit = {
- val resultPath = AbstractTestBase.getTempDirPath("result")
+ val resultPath = getTempDirPath("result")
SessionWindowing.main(Array("--output", resultPath))
TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
}
@@ -124,8 +122,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
def testWindowWordCount(): Unit = {
val windowSize = "250"
val slideSize = "150"
- val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT)
- val resultPath = AbstractTestBase.getTempDirPath("result")
+ val textPath = createTempFile("text.txt", WordCountData.TEXT)
+ val resultPath = getTempDirPath("result")
WindowWordCount.main(Array(
"--input", textPath,
@@ -142,8 +140,8 @@ class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
@Test
def testWordCount(): Unit = {
- val textPath = AbstractTestBase.createTempFile("text.txt", WordCountData.TEXT)
- val resultPath = AbstractTestBase.getTempDirPath("result")
+ val textPath = createTempFile("text.txt", WordCountData.TEXT)
+ val resultPath = getTempDirPath("result")
WordCount.main(Array(
"--input", textPath,
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 81b83a3..4f2383a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Either;
import org.junit.After;
@@ -48,7 +48,7 @@ import java.util.Map;
* End to end tests of both CEP operators and {@link NFA}.
*/
@SuppressWarnings("serial")
-public class CEPITCase extends StreamingMultipleProgramsTestBase {
+public class CEPITCase extends AbstractTestBase {
private String resultPath;
private String expected;
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
index f3d0309..44f89cc 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.JavaStreamTestData;
import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.junit.Test;
@@ -41,7 +41,7 @@ import java.util.List;
/**
* Integration tests for streaming SQL.
*/
-public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
+public class JavaSqlITCase extends AbstractTestBase {
@Test
public void testRowRegisterRowWithNames() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 820de08..741a3cb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.api.stream
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
+import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert.assertEquals
import org.junit._
-class ExplainTest extends StreamingMultipleProgramsTestBase {
+class ExplainTest extends AbstractTestBase {
private val testFilePath = this.getClass.getResource("/").getFile
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index c1ad08c..1de2b1e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -19,13 +19,13 @@
package org.apache.flink.table.api.stream.table.validation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, ValidationException}
import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.test.util.AbstractTestBase
import org.junit.Test
-class UnsupportedOpsValidationTest extends StreamingMultipleProgramsTestBase {
+class UnsupportedOpsValidationTest extends AbstractTestBase {
@Test(expected = classOf[ValidationException])
def testSort(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index a301354..c553ee6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -18,8 +18,8 @@
package org.apache.flink.table.runtime.stream
-import java.math.BigDecimal
import java.lang.{Integer => JInt, Long => JLong}
+import java.math.BigDecimal
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -28,15 +28,15 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types}
import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
+import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
+import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
import org.apache.flink.table.runtime.utils.StreamITCase
import org.apache.flink.table.utils.{MemoryTableSinkUtil, TestTableSourceWithTime}
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
@@ -46,7 +46,7 @@ import scala.collection.mutable
/**
* Tests for access and materialization of time attributes.
*/
-class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
+class TimeAttributesITCase extends AbstractTestBase {
val data = List(
(1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index 30ada56..246ce2e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -20,17 +20,17 @@ package org.apache.flink.table.runtime.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+class TableSourceITCase extends AbstractTestBase {
@Test
def testCsvTableSource(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 46788f5..a20b626 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -20,20 +20,19 @@ package org.apache.flink.table.runtime.stream.table
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.Literal
import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, SplitUDF}
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, UserDefinedFunctionTestUtils}
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable
-class CalcITCase extends StreamingMultipleProgramsTestBase {
+class CalcITCase extends AbstractTestBase {
@Test
def testSimpleSelectAll(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index 215526d..0f563e6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -21,19 +21,19 @@ import java.lang.{Boolean => JBoolean}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
import org.apache.flink.table.expressions.utils.{Func18, RichFunc2}
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, _}
import org.apache.flink.table.utils._
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.{Before, Test}
import scala.collection.mutable
-class CorrelateITCase extends StreamingMultipleProgramsTestBase {
+class CorrelateITCase extends AbstractTestBase {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
index 1eebeee..588cff1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
@@ -22,17 +22,17 @@ import java.math.BigDecimal
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.table.functions.aggfunctions.CountAggFunction
import org.apache.flink.table.runtime.stream.table.GroupWindowITCase._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge}
import org.apache.flink.table.runtime.utils.StreamITCase
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
@@ -43,7 +43,7 @@ import scala.collection.mutable
* We only test some aggregations until better testing of constructed DataStream
* programs is possible.
*/
-class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+class GroupWindowITCase extends AbstractTestBase {
private val queryConfig = new StreamQueryConfig()
queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 5e15e14..479bce2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -20,18 +20,18 @@ package org.apache.flink.table.runtime.stream.table
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable
-class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
+class SetOperatorsITCase extends AbstractTestBase {
@Test
def testUnion(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index b44d8ef..f1badee 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -31,22 +31,21 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
import org.apache.flink.table.sinks._
import org.apache.flink.table.utils.MemoryTableSinkUtil
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.junit.Assert._
import org.junit.Test
-import scala.collection.mutable
import scala.collection.JavaConverters._
+import scala.collection.mutable
-class TableSinkITCase extends StreamingMultipleProgramsTestBase {
+class TableSinkITCase extends AbstractTestBase {
@Test
def testInsertIntoRegisteredTableSink(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index c9ea30a..d1a88b7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -29,12 +29,12 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.table.utils._
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.junit.Assert._
@@ -43,7 +43,7 @@ import org.junit.Test
import scala.collection.JavaConverters._
import scala.collection.mutable
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+class TableSourceITCase extends AbstractTestBase {
@Test(expected = classOf[TableException])
def testInvalidDatastreamType(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
index 8c41f22..5cfab4a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
@@ -18,11 +18,11 @@
package org.apache.flink.table.runtime.utils
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
import org.junit.Rule
import org.junit.rules.TemporaryFolder
-class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase {
+class StreamingWithStateTestBase extends AbstractTestBase {
val _tempFolder = new TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index b76ade7..6fb06d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -70,6 +70,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
import org.hamcrest.core.StringStartsWith;
import org.junit.Assert;
@@ -90,7 +91,7 @@ import static org.junit.Assert.fail;
* Tests for {@link DataStream}.
*/
@SuppressWarnings("serial")
-public class DataStreamTest {
+public class DataStreamTest extends TestLogger {
/**
* Tests union functionality. This ensures that self-unions and unions of streams
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
index fb7c765..2311092 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
import org.junit.After;
import org.junit.Before;
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
/**
* IT cases for the {@link org.apache.flink.api.java.io.CsvOutputFormat}.
*/
-public class CsvOutputFormatITCase extends StreamingMultipleProgramsTestBase {
+public class CsvOutputFormatITCase extends AbstractTestBase {
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
index 84b81e2..c2e450a 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.scala.api;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
import org.junit.After;
import org.junit.Before;
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
/**
* IT cases for the {@link org.apache.flink.api.java.io.TextOutputFormat}.
*/
-public class TextOutputFormatITCase extends StreamingMultipleProgramsTestBase {
+public class TextOutputFormatITCase extends AbstractTestBase {
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index fddbe00..5412e8e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -27,14 +27,13 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-
-import org.junit.Test
+import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert._
+import org.junit.Test
import scala.collection.mutable
-class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+class CoGroupJoinITCase extends AbstractTestBase {
@Test
def testCoGroup(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 60c609d..6158c8e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala
import java.lang
import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.operators.ResourceSpec
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.ProcessFunction
@@ -32,12 +31,12 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.streaming.runtime.partitioner._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.util.Collector
import org.junit.Assert._
import org.junit.Test
-class DataStreamTest extends StreamingMultipleProgramsTestBase {
+class DataStreamTest extends AbstractTestBase {
@Test
def testNaming(): Unit = {
@@ -242,7 +241,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
*/
@Test
def testParallelism() {
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+ val parallelism = env.getParallelism
val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
val map = src.map(x => (0L, 0L))
@@ -255,9 +255,12 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val sink = map.addSink(x => {})
assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+ assert(parallelism == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+ assert(parallelism == env
+ .getStreamGraph
+ .getStreamNode(sink.getTransformation.getId)
+ .getParallelism)
try {
src.setParallelism(3)
@@ -268,18 +271,23 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
}
- env.setParallelism(7)
+ val newParallelism = parallelism - 1
+
+ env.setParallelism(newParallelism)
// the parallelism does not change since some windowing code takes the parallelism from
// input operations and that cannot change dynamically
assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+ assert(parallelism == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+ assert(parallelism == env
+ .getStreamGraph
+ .getStreamNode(sink.getTransformation.getId)
+ .getParallelism)
val parallelSource = env.generateSequence(0, 0)
parallelSource.print()
- assert(7 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
+ assert(newParallelism == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
parallelSource.setParallelism(3)
assert(3 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index 8e66171..e844928 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -27,8 +27,8 @@ import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.util.Collector
import org.junit.Assert._
import org.junit.Test
@@ -36,7 +36,7 @@ import org.junit.Test
/**
* Integration test for streaming programs using side outputs.
*/
-class SideOutputITCase extends StreamingMultipleProgramsTestBase {
+class SideOutputITCase extends AbstractTestBase {
/**
* Test ProcessFunction side output.
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
index 35a56d7..db0fb71 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.util.Collector
import org.junit.Assert._
import org.junit.Test
@@ -39,7 +39,7 @@ import org.junit.Test
* These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut
* instantiate the correct window operator.
*/
-class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+class TimeWindowTranslationTest extends AbstractTestBase {
/**
* Verifies that calls to timeWindow() instantiate a regular
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index dc38758..ef27685 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.{Ignore, Test}
+import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert._
+import org.junit.Test
import scala.collection.mutable
@@ -41,7 +41,7 @@ import scala.collection.mutable
* Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
* work for windows, because FoldWindowFunction is OutputTypeConfigurable.
*/
-class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
+class WindowFoldITCase extends AbstractTestBase {
@Test
def testFoldWindow(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index ee1dbfd..b2137f5 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
import scala.collection.mutable
@@ -41,7 +41,7 @@ import scala.collection.mutable
* Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
* work for windows, because FoldWindowFunction is OutputTypeConfigurable.
*/
-class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
+class WindowReduceITCase extends AbstractTestBase {
@Test
def testReduceWindow(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
deleted file mode 100644
index aa6e618..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for streaming unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * <p>To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the StreamExecutionEnvironment from
- * the context:
- *
- * <pre>
- * {@literal @}Test
- * public void someTest() {
- * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * {@literal @}Test
- * public void anotherTest() {
- * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * </pre>
- */
-public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
-
- // ------------------------------------------------------------------------
- // The mini cluster that is shared across tests
- // ------------------------------------------------------------------------
-
- protected static final int DEFAULT_PARALLELISM = 4;
-
- protected static final Logger LOG = LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index bbd250d..65b351d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -23,8 +23,6 @@ import org.apache.flink.util.FileUtils;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -42,7 +40,7 @@ import java.io.IOException;
* <pre>
* {@literal @}Test
* public void someTest() {
- * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
* // test code
* env.execute();
* }
@@ -58,8 +56,6 @@ import java.io.IOException;
*/
public abstract class AbstractTestBase extends TestBaseUtils {
- protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
-
private static final int DEFAULT_PARALLELISM = 4;
@ClassRule
@@ -77,17 +73,17 @@ public abstract class AbstractTestBase extends TestBaseUtils {
// Temporary File Utilities
// --------------------------------------------------------------------------------------------
- public static String getTempDirPath(String dirName) throws IOException {
+ public String getTempDirPath(String dirName) throws IOException {
File f = createAndRegisterTempFile(dirName);
return f.toURI().toString();
}
- public static String getTempFilePath(String fileName) throws IOException {
+ public String getTempFilePath(String fileName) throws IOException {
File f = createAndRegisterTempFile(fileName);
return f.toURI().toString();
}
- public static String createTempFile(String fileName, String contents) throws IOException {
+ public String createTempFile(String fileName, String contents) throws IOException {
File f = createAndRegisterTempFile(fileName);
if (!f.getParentFile().exists()) {
f.getParentFile().mkdirs();
@@ -97,7 +93,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
return f.toURI().toString();
}
- public static File createAndRegisterTempFile(String fileName) throws IOException {
+ public File createAndRegisterTempFile(String fileName) throws IOException {
return new File(TEMPORARY_FOLDER.newFolder(), fileName);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index c8872ac..20dbebb 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,6 +19,7 @@
package org.apache.flink.test.util;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -60,11 +61,13 @@ public class MiniClusterResource extends ExternalResource {
@Override
public void before() throws Exception {
+ final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
+
+ configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.numberSlotsPerTaskManager);
+
localFlinkMiniCluster = TestBaseUtils.startCluster(
- miniClusterResourceConfiguration.getNumberTaskManagers(),
- miniClusterResourceConfiguration.getNumberSlotsPerTaskManager(),
- false,
- false,
+ configuration,
true);
numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 06792ea..10039e6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,12 +18,8 @@
package org.apache.flink.test.util;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
import java.util.Arrays;
@@ -57,7 +53,7 @@ import java.util.Collection;
*
* }</pre>
*/
-public class MultipleProgramsTestBase extends TestBaseUtils {
+public class MultipleProgramsTestBase extends AbstractTestBase {
/**
* Enum that defines which execution environment to run the next test on:
@@ -70,16 +66,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
}
// ------------------------------------------------------------------------
- // The mini cluster that is shared across tests
- // ------------------------------------------------------------------------
-
- protected static final int DEFAULT_PARALLELISM = 4;
-
- protected static boolean startWebServer = false;
-
- protected static LocalFlinkMiniCluster cluster = null;
-
- // ------------------------------------------------------------------------
protected final TestExecutionMode mode;
@@ -93,12 +79,21 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
@Before
public void setupEnvironment() {
+ TestEnvironment testEnvironment;
switch(mode){
case CLUSTER:
- new TestEnvironment(cluster, 4, false).setAsContext();
+ // This only works because of the quirks we built in the TestEnvironment.
+ // We should refactor this in the future!!!
+ testEnvironment = miniClusterResource.getTestEnvironment();
+ testEnvironment.getConfig().disableObjectReuse();
+ testEnvironment.setAsContext();
break;
case CLUSTER_OBJECT_REUSE:
- new TestEnvironment(cluster, 4, true).setAsContext();
+ // This only works because of the quirks we built in the TestEnvironment.
+ // We should refactor this in the future!!!
+ testEnvironment = miniClusterResource.getTestEnvironment();
+ testEnvironment.getConfig().enableObjectReuse();
+ testEnvironment.setAsContext();
break;
case COLLECTION:
new CollectionTestEnvironment().setAsContext();
@@ -120,25 +115,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
}
// ------------------------------------------------------------------------
- // Cluster setup & teardown
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void setup() throws Exception {
- cluster = TestBaseUtils.startCluster(
- 1,
- DEFAULT_PARALLELISM,
- startWebServer,
- false,
- true);
- }
-
- @AfterClass
- public static void teardown() throws Exception {
- stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
- }
-
- // ------------------------------------------------------------------------
// Parametrization lets the tests run in cluster and collection mode
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index d2237ad..5a96326 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -459,7 +459,7 @@ public class TestBaseUtils extends TestLogger {
throw new IllegalArgumentException("This path does not denote a local file.");
}
} catch (URISyntaxException | NullPointerException e) {
- throw new IllegalArgumentException("This path does not describe a valid local file URI.");
+ throw new IllegalArgumentException("This path does not describe a valid local file URI.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index d224905..b207de8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
@@ -57,7 +57,7 @@ import static org.junit.Assert.assertTrue;
* state reflects the "exactly once" semantics.
*/
@SuppressWarnings({"serial", "deprecation"})
-public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBase {
+public class CoStreamCheckpointingITCase extends AbstractTestBase {
private static final long NUM_STRINGS = 10_000L;
private static final int PARALLELISM = 4;
http://git-wip-us.apache.org/repos/asf/flink/blob/b90210e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 16d8b54..7b058a0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -69,7 +69,7 @@ import static org.junit.Assert.fail;
* successfully completed checkpoint.
*/
@SuppressWarnings("serial")
-public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTestBase {
+public class StreamCheckpointNotifierITCase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);