You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/26 16:23:21 UTC
[4/4] flink git commit: [tests] Adjust tests for dedicated streaming
mode and clean up test bases.
[tests] Adjust tests for dedicated streaming mode and clean up test bases.
This closes #718
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04377224
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04377224
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04377224
Branch: refs/heads/master
Commit: 0437722449c59fdc21fc5d18a59e5e0b961208af
Parents: efec229
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 22 17:24:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 26 16:22:23 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmanager/JobManager.scala | 3 +-
.../JobManagerProcessReapingTest.java | 3 +-
.../jobmanager/JobManagerStartupTest.java | 11 ++-
.../flink/runtime/jobmanager/JobSubmitTest.java | 3 +-
...askManagerComponentsStartupShutdownTest.java | 4 +-
.../TaskManagerProcessReapingTest.java | 5 +-
.../TaskManagerRegistrationTest.java | 9 +-
.../runtime/taskmanager/TaskManagerTest.java | 12 ++-
.../taskmanager/TestManagerStartupTest.java | 10 ++-
.../jobmanager/JobManagerRegistrationTest.scala | 6 +-
.../runtime/testingUtils/TestingCluster.scala | 24 +++--
.../runtime/testingUtils/TestingUtils.scala | 28 +-----
.../api/complex/ComplexIntegrationTest.java | 9 +-
.../util/StreamingMultipleProgramsTestBase.java | 53 ++++++-----
.../util/StreamingProgramTestBase.java | 2 +-
.../scala/table/test/AggregationsITCase.scala | 20 ++---
.../flink/api/scala/table/test/AsITCase.scala | 21 ++---
.../api/scala/table/test/CastingITCase.scala | 14 +--
.../scala/table/test/ExpressionsITCase.scala | 22 ++---
.../api/scala/table/test/FilterITCase.scala | 20 ++---
.../table/test/GroupedAggreagationsITCase.scala | 14 +--
.../flink/api/scala/table/test/JoinITCase.scala | 22 ++---
.../api/scala/table/test/SelectITCase.scala | 22 ++---
.../table/test/StringExpressionsITCase.scala | 16 ++--
.../util/AbstractMultipleProgramsTestBase.java | 84 ------------------
.../flink/test/util/AbstractTestBase.java | 14 +--
.../test/util/MultipleProgramsTestBase.java | 52 ++++++++++-
.../apache/flink/test/util/TestBaseUtils.java | 92 +++++++++++---------
.../apache/flink/test/util/FlinkTestBase.scala | 8 +-
.../test/util/ForkableFlinkMiniCluster.scala | 21 +++--
.../AbstractProcessFailureRecoveryTest.java | 5 +-
.../recovery/ProcessFailureCancelingITCase.java | 3 +-
.../flink/test/web/WebFrontendITCase.java | 9 +-
.../api/scala/actions/CountCollectITCase.scala | 2 +-
.../scala/functions/ClosureCleanerITCase.scala | 6 +-
.../hadoop/mapred/WordCountMapredITCase.scala | 5 +-
.../mapreduce/WordCountMapreduceITCase.scala | 5 +-
.../scala/io/ScalaCsvReaderWithPOJOITCase.scala | 6 +-
.../api/scala/operators/AggregateITCase.scala | 6 +-
.../api/scala/operators/CoGroupITCase.scala | 6 +-
.../flink/api/scala/operators/CrossITCase.scala | 6 +-
.../api/scala/operators/DistinctITCase.scala | 6 +-
.../api/scala/operators/ExamplesITCase.scala | 13 ++-
.../api/scala/operators/FilterITCase.scala | 10 +--
.../api/scala/operators/FirstNITCase.scala | 6 +-
.../api/scala/operators/FlatMapITCase.scala | 6 +-
.../scala/operators/GroupCombineITCase.scala | 5 +-
.../api/scala/operators/GroupReduceITCase.scala | 6 +-
.../flink/api/scala/operators/JoinITCase.scala | 6 +-
.../flink/api/scala/operators/MapITCase.scala | 6 +-
.../api/scala/operators/PartitionITCase.scala | 6 +-
.../api/scala/operators/ReduceITCase.scala | 6 +-
.../api/scala/operators/SumMinMaxITCase.scala | 2 +-
.../flink/api/scala/operators/UnionITCase.scala | 6 +-
.../scala/runtime/ScalaSpecialTypesITCase.scala | 22 ++---
.../yarn/appMaster/YarnTaskManagerRunner.java | 15 +++-
.../apache/flink/yarn/ApplicationMaster.scala | 21 +++--
57 files changed, 421 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ba819ca..d5df633 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -945,7 +945,8 @@ object JobManager {
* @param args command line arguments
* @return Quadruple of configuration, execution mode and an optional listening address
*/
- def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
+ def parseArgs(args: Array[String]):
+ (Configuration, JobManagerMode, StreamingMode, String, Int) = {
val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
head("Flink JobManager")
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index a332ee1..be73bf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -27,6 +27,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -183,7 +184,7 @@ public class JobManagerProcessReapingTest {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
- JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", port);
+ JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.BATCH_ONLY, "localhost", port);
System.exit(0);
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 070e376..e7665a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -28,6 +28,7 @@ import com.google.common.io.Files;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.After;
@@ -80,7 +81,8 @@ public class JobManagerStartupTest {
}
try {
- JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER, "localhost", portNum);
+ JobManager.runJobManager(new Configuration(), JobManagerMode.CLUSTER,
+ StreamingMode.BATCH_ONLY, "localhost", portNum);
fail("this should throw an exception");
}
catch (Exception e) {
@@ -94,7 +96,9 @@ public class JobManagerStartupTest {
try {
portOccupier.close();
}
- catch (Throwable t) {}
+ catch (Throwable t) {
+ // ignore
+ }
}
}
@@ -117,7 +121,8 @@ public class JobManagerStartupTest {
failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory);
try {
- JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);
+ JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER,
+ StreamingMode.BATCH_ONLY, "localhost", portNum);
fail("this should fail with an exception");
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index e69687f..7b6d688 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
@@ -61,7 +62,7 @@ public class JobSubmitTest {
scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.empty();
jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress);
- jobManager = JobManager.startJobManagerActors(config, jobManagerSystem)._1();
+ jobManager = JobManager.startJobManagerActors(config, jobManagerSystem, StreamingMode.BATCH_ONLY)._1();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 7d4994d..a67cd00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -68,7 +69,8 @@ public class TaskManagerComponentsStartupShutdownTest {
try {
actorSystem = AkkaUtils.createLocalActorSystem(config);
- final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem)._1();
+ final ActorRef jobManager = JobManager.startJobManagerActors(config, actorSystem,
+ StreamingMode.BATCH_ONLY)._1();
// create the components for the TaskManager manually
final TaskManagerConfiguration tmConfig = new TaskManagerConfiguration(
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 91fadca..c55a721 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -24,6 +24,7 @@ import akka.actor.PoisonPill;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
@@ -84,7 +85,7 @@ public class TaskManagerProcessReapingTest {
jmActorSystem = AkkaUtils.createActorSystem(
new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
- JobManager.startJobManagerActors(new Configuration(), jmActorSystem);
+ JobManager.startJobManagerActors(new Configuration(), jmActorSystem, StreamingMode.BATCH_ONLY);
final int taskManagerPort = NetUtils.getAvailablePort();
@@ -198,7 +199,7 @@ public class TaskManagerProcessReapingTest {
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
- TaskManager.runTaskManager("localhost", taskManagerPort, cfg);
+ TaskManager.runTaskManager("localhost", taskManagerPort, cfg, StreamingMode.BATCH_ONLY);
// wait forever
Object lock = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 69964ea..3e5f2cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,6 +28,7 @@ import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.JobManager;
@@ -152,7 +153,8 @@ public class TaskManagerRegistrationTest {
Thread.sleep(6000);
// now start the JobManager, with the regular akka URL
- final ActorRef jobManager = JobManager.startJobManagerActors(new Configuration(), actorSystem)._1();
+ final ActorRef jobManager =
+ JobManager.startJobManagerActors(new Configuration(), actorSystem, StreamingMode.BATCH_ONLY)._1();
// check that the TaskManagers are registered
Future<Object> responseFuture = Patterns.ask(
@@ -371,6 +373,7 @@ public class TaskManagerRegistrationTest {
NONE_STRING, // no actor name -> random
new Some<String>(jobManager.path().toString()), // job manager path
false, // init network stack !!!
+ StreamingMode.BATCH_ONLY,
TaskManager.class);
watch(taskManager);
@@ -415,7 +418,8 @@ public class TaskManagerRegistrationTest {
private static ActorRef startJobManager() throws Exception {
// start the actors. don't give names, so they get generated names and we
// avoid conflicts with the actor names
- return JobManager.startJobManagerActors(new Configuration(), actorSystem, NONE_STRING, NONE_STRING)._1();
+ return JobManager.startJobManagerActors(new Configuration(), actorSystem,
+ NONE_STRING, NONE_STRING, StreamingMode.BATCH_ONLY)._1();
}
private static ActorRef startTaskManager(ActorRef jobManager) throws Exception {
@@ -430,6 +434,7 @@ public class TaskManagerRegistrationTest {
NONE_STRING, // no actor name -> random
new Some<String>(jobManagerUrl), // job manager path
true, // local network stack only
+ StreamingMode.BATCH_ONLY,
TaskManager.class);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a308c81..d33dcd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -27,9 +27,11 @@ import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -59,11 +61,14 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -903,7 +908,8 @@ public class TaskManagerTest {
return createTaskManager(jobManager, waitForRegistration, true, NetUtils.getAvailablePort());
}
- public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration, boolean useLocalCommunication, int dataPort) {
+ public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration,
+ boolean useLocalCommunication, int dataPort) {
ActorRef taskManager = null;
try {
Configuration cfg = new Configuration();
@@ -916,7 +922,9 @@ public class TaskManagerTest {
cfg, system, "localhost",
Option.<String>empty(),
jobMangerUrl,
- useLocalCommunication, TestingTaskManager.class);
+ useLocalCommunication,
+ StreamingMode.BATCH_ONLY,
+ TestingTaskManager.class);
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
index 2b945e1..a033eb1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TestManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.StreamingMode;
import org.junit.Test;
import java.io.File;
@@ -53,7 +54,8 @@ public class TestManagerStartupTest {
final int port = blocker.getLocalPort();
try {
- TaskManager.runTaskManager(localHostName, port, new Configuration(), TaskManager.class);
+ TaskManager.runTaskManager(localHostName, port, new Configuration(),
+ StreamingMode.BATCH_ONLY, TaskManager.class);
fail("This should fail with an IOException");
}
catch (IOException e) {
@@ -101,7 +103,7 @@ public class TestManagerStartupTest {
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
try {
- TaskManager.runTaskManager("localhost", 0, cfg);
+ TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
fail("Should fail synchronously with an exception");
}
catch (IOException e) {
@@ -138,7 +140,7 @@ public class TestManagerStartupTest {
// something invalid
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
try {
- TaskManager.runTaskManager("localhost", 0, cfg);
+ TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
fail("Should fail synchronously with an exception");
}
catch (IllegalConfigurationException e) {
@@ -150,7 +152,7 @@ public class TestManagerStartupTest {
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE) >> 20;
cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
try {
- TaskManager.runTaskManager("localhost", 0, cfg);
+ TaskManager.runTaskManager("localhost", 0, cfg, StreamingMode.BATCH_ONLY);
fail("Should fail synchronously with an exception");
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 5fde5ea..5ae6b5b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -23,6 +23,7 @@ import java.net.InetAddress
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
@@ -122,8 +123,9 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
}
private def startTestingJobManager(system: ActorSystem): ActorRef = {
- val (jm: ActorRef, _) = JobManager.startJobManagerActors(
- new Configuration(), _system, None, None)
+ val (jm: ActorRef, _) = JobManager.startJobManagerActors(new Configuration(), _system,
+ None, None,
+ StreamingMode.BATCH_ONLY)
jm
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9f9fe93..a904f60 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorRef, Props, ActorSystem}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
@@ -33,9 +34,19 @@ import org.apache.flink.runtime.taskmanager.TaskManager
* @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]],
* otherwise false
*/
-class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
- extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
+class TestingCluster(userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ streamingMode: StreamingMode)
+ extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
+
+ def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+ = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+
+ def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+ // --------------------------------------------------------------------------
+
override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
@@ -52,13 +63,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
executionRetries, delayBetweenRetries,
timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
-
+
val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
-
+
val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
libraryCacheManager, archive, accumulatorManager, executionRetries,
- delayBetweenRetries, timeout) with TestingJobManager)
+ delayBetweenRetries, timeout, streamingMode) with TestingJobManager)
actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
}
@@ -72,12 +83,13 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
} else {
None
}
-
+
TaskManager.startTaskManagerComponentsAndActor(configuration, system,
HOSTNAME,
Some(tmActorName),
jobManagerPath,
numTaskManagers == 1,
+ streamingMode,
classOf[TestingTaskManager])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 63dce31..3611633 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.CallingThreadDispatcher
+
import com.typesafe.config.ConfigFactory
+
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
-import org.apache.flink.runtime.taskmanager.TaskManager
-import scala.concurrent.duration._
+import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
@@ -55,28 +56,7 @@ object TestingUtils {
}
def getDefaultTestingActorSystemConfig = testConfig
-
-
- def startTestingTaskManagerWithConfiguration(hostname: String,
- jobManagerURL: String,
- config: Configuration,
- system: ActorSystem) : ActorRef = {
-
-
- TaskManager.startTaskManagerComponentsAndActor(config, system,
- hostname,
- None, // random actor name
- Some(jobManagerURL), // job manager
- true, classOf[TestingTaskManager])
- }
-
- def startTestingTaskManager(jobManager: ActorRef, system: ActorSystem): ActorRef = {
-
- val jmURL = jobManager.path.toString
- val config = new Configuration()
-
- startTestingTaskManagerWithConfiguration("localhost", jmURL, config, system)
- }
+
def startTestingCluster(numSlots: Int, numTMs: Int = 1,
timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 738654a..6fdd839 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -52,8 +52,6 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.Serializable;
import java.text.SimpleDateFormat;
@@ -65,7 +63,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
-@RunWith(Parameterized.class)
+@SuppressWarnings("serial")
public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
// *************************************************************************
@@ -76,10 +74,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
private String resultPath2;
private String expected1;
private String expected2;
-
- public ComplexIntegrationTest(TestExecutionMode mode) {
- super(mode);
- }
+
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 36e62f9..945ac07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,12 +18,12 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
-import org.junit.runners.Parameterized;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
-import java.util.ArrayList;
-import java.util.Collection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
/**
* Base class for streaming unit tests that run multiple tests and want to reuse the same
@@ -53,25 +53,34 @@ import java.util.Collection;
*
* }</pre>
*/
-public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
- public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
- super(mode);
- switch(this.mode){
- case CLUSTER:
- TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
- clusterEnv.setAsContext();
- break;
- case COLLECTION:
- throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
- }
+ // ------------------------------------------------------------------------
+ // The mini cluster that is shared across tests
+ // ------------------------------------------------------------------------
+
+ protected static final int DEFAULT_PARALLELISM = 4;
+
+ protected static ForkableFlinkMiniCluster cluster = null;
+
+ // ------------------------------------------------------------------------
+
+ public StreamingMultipleProgramsTestBase() {
+ TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+ clusterEnv.setAsContext();
+ }
+
+ // ------------------------------------------------------------------------
+ // Cluster setup & teardown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() throws Exception{
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false);
}
- @Parameterized.Parameters(name = "Execution mode = {0}")
- public static Collection<TestExecutionMode[]> executionModes() {
- TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
- ArrayList<TestExecutionMode[]> temsList = new ArrayList<TestExecutionMode[]>();
- temsList.add(tems);
- return temsList;
+ @AfterClass
+ public static void teardown() throws Exception {
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index 0446b61..23be327 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.Assert;
+import org.junit.Assert;
import org.junit.Test;
public abstract class StreamingProgramTestBase extends AbstractTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 38be85e..3b7ab8d 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAggregationTypes: Unit = {
+ def testAggregationTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -62,7 +62,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test(expected = classOf[ExpressionException])
- def testAggregationOnNonExistingField: Unit = {
+ def testAggregationOnNonExistingField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -74,7 +74,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test
- def testWorkingAggregationDataTypes: Unit = {
+ def testWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(
@@ -88,7 +88,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test
- def testAggregationWithArithmetic: Unit = {
+ def testAggregationWithArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable
@@ -100,7 +100,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test(expected = classOf[ExpressionException])
- def testNonWorkingAggregationDataTypes: Unit = {
+ def testNonWorkingAggregationDataTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("Hello", 1)).toTable
@@ -112,7 +112,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
}
@Test(expected = classOf[ExpressionException])
- def testNoNestedAggregations: Unit = {
+ def testNoNestedAggregations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("Hello", 1)).toTable
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 3a0cc69..c6259ec 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -32,6 +32,7 @@ import org.junit.runners.Parameterized
@RunWith(classOf[Parameterized])
class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+
private var resultPath: String = null
private var expected: String = ""
private val _tempFolder = new TemporaryFolder()
@@ -45,12 +46,12 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAs: Unit = {
+ def testAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -66,7 +67,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToFewFields: Unit = {
+ def testAsWithToFewFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -77,7 +78,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToManyFields: Unit = {
+ def testAsWithToManyFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -88,7 +89,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithAmbiguousFields: Unit = {
+ def testAsWithAmbiguousFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -99,7 +100,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithNonFieldReference1: Unit = {
+ def testAsWithNonFieldReference1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
// as can only have field references
@@ -111,7 +112,7 @@ class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
}
@Test(expected = classOf[ExpressionException])
- def testAsWithNonFieldReference2: Unit = {
+ def testAsWithNonFieldReference2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
// as can only have field references
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 9557985..736cf68 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.scala.table.test
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -43,12 +43,12 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAutoCastToString: Unit = {
+ def testAutoCastToString(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
@@ -60,7 +60,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
}
@Test
- def testNumericAutoCastInArithmetic: Unit = {
+ def testNumericAutoCastInArithmetic(): Unit = {
// don't test everything, just some common cast directions
@@ -74,7 +74,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
}
@Test
- def testNumericAutoCastInComparison: Unit = {
+ def testNumericAutoCastInComparison(): Unit = {
// don't test everything, just some common cast directions
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 8c60acf..d9de287 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testArithmetic: Unit = {
+ def testArithmetic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, 10)).as('a, 'b)
@@ -61,7 +61,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testLogic: Unit = {
+ def testLogic(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, true)).as('a, 'b)
@@ -73,7 +73,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testComparisons: Unit = {
+ def testComparisons(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
@@ -85,7 +85,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testBitwiseOperations: Unit = {
+ def testBitwiseOperations(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -98,7 +98,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testBitwiseWithAutocast: Unit = {
+ def testBitwiseWithAutocast(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -111,7 +111,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test(expected = classOf[ExpressionException])
- def testBitwiseWithNonWorkingAutocast: Unit = {
+ def testBitwiseWithNonWorkingAutocast(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
@@ -124,7 +124,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
}
@Test
- def testCaseInsensitiveForAs: Unit = {
+ def testCaseInsensitiveForAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 982a302..bc51a7e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -24,8 +24,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -47,12 +47,12 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testAllRejectingFilter: Unit = {
+ def testAllRejectingFilter(): Unit = {
/*
* Test all-rejecting filter.
*/
@@ -67,7 +67,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testAllPassingFilter: Unit = {
+ def testAllPassingFilter(): Unit = {
/*
* Test all-passing filter.
*/
@@ -87,7 +87,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testFilterOnStringTupleField: Unit = {
+ def testFilterOnStringTupleField(): Unit = {
/*
* Test filter on String tuple field.
*/
@@ -100,7 +100,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testFilterOnIntegerTupleField: Unit = {
+ def testFilterOnIntegerTupleField(): Unit = {
/*
* Test filter on Integer tuple field.
*/
@@ -120,7 +120,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@Ignore
@Test
- def testFilterBasicType: Unit = {
+ def testFilterBasicType(): Unit = {
/*
* Test filter on basic type
*/
@@ -137,7 +137,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@Ignore
@Test
- def testFilterOnCustomType: Unit = {
+ def testFilterOnCustomType(): Unit = {
/*
* Test filter on custom type
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 1f29722..d76d75c 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test(expected = classOf[ExpressionException])
- def testGroupingOnNonExistentField: Unit = {
+ def testGroupingOnNonExistentField(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
@@ -63,7 +63,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
}
@Test
- def testGroupedAggregate: Unit = {
+ def testGroupedAggregate(): Unit = {
// the grouping key needs to be forwarded to the intermediate DataSet, even
// if we don't want the key in the output
@@ -79,7 +79,7 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
}
@Test
- def testGroupingKeyForwardIfNotUsed: Unit = {
+ def testGroupingKeyForwardIfNotUsed(): Unit = {
// the grouping key needs to be forwarded to the intermediate DataSet, even
// if we don't want the key in the output
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 17221d8..b3baa56 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testJoin: Unit = {
+ def testJoin(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -63,7 +63,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test
- def testJoinWithFilter: Unit = {
+ def testJoinWithFilter(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -76,7 +76,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test
- def testJoinWithMultipleKeys: Unit = {
+ def testJoinWithMultipleKeys(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -90,7 +90,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test(expected = classOf[ExpressionException])
- def testJoinNonExistingKey: Unit = {
+ def testJoinNonExistingKey(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -103,7 +103,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test(expected = classOf[ExpressionException])
- def testJoinWithNonMatchingKeyTypes: Unit = {
+ def testJoinWithNonMatchingKeyTypes(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
@@ -116,7 +116,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test(expected = classOf[ExpressionException])
- def testJoinWithAmbiguousFields: Unit = {
+ def testJoinWithAmbiguousFields(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
@@ -129,7 +129,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
}
@Test
- def testJoinWithAggregation: Unit = {
+ def testJoinWithAggregation(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 6ba6c9f..1a13d93 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -45,12 +45,12 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testSimpleSelectAll: Unit = {
+ def testSimpleSelectAll(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable.select('_1, '_2, '_3)
@@ -66,7 +66,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testSimpleSelectAllWithAs: Unit = {
+ def testSimpleSelectAllWithAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
@@ -82,7 +82,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test
- def testSimpleSelectWithNaming: Unit = {
+ def testSimpleSelectWithNaming(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).toTable
@@ -97,7 +97,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToFewFields: Unit = {
+ def testAsWithToFewFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
@@ -108,7 +108,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test(expected = classOf[ExpressionException])
- def testAsWithToManyFields: Unit = {
+ def testAsWithToManyFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
@@ -119,7 +119,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
}
@Test(expected = classOf[ExpressionException])
- def testAsWithAmbiguousFields: Unit = {
+ def testAsWithAmbiguousFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
@@ -131,7 +131,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
@Test(expected = classOf[ExpressionException])
- def testOnlyFieldRefInAs: Unit = {
+ def testOnlyFieldRefInAs(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index 3f0f46f..65fe77a 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.table.ExpressionException
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
@@ -44,12 +44,12 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
+ def after(): Unit = {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
- def testSubstring: Unit = {
+ def testSubstring(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
.select('a.substring(0, 'b))
@@ -60,7 +60,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@Test
- def testSubstringWithMaxEnd: Unit = {
+ def testSubstringWithMaxEnd(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
.select('a.substring('b))
@@ -71,7 +71,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@Test(expected = classOf[ExpressionException])
- def testNonWorkingSubstring1: Unit = {
+ def testNonWorkingSubstring1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
@@ -83,7 +83,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT
}
@Test(expected = classOf[ExpressionException])
- def testNonWorkingSubstring2: Unit = {
+ def testNonWorkingSubstring2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
deleted file mode 100644
index ef81dfe..0000000
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Abstract base class for unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the ExecutionEnvironment from
- * the context:
- *
- * <pre>{@code
- *
- * @Test
- * public void someTest() {
- * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * @Test
- * public void anotherTest() {
- * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * }</pre>
- */
-public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
-
- /**
- * Enum that defines which execution environment to run the next test on:
- * An embedded local flink cluster, or the collection execution backend.
- */
- public enum TestExecutionMode {
- CLUSTER,
- COLLECTION
- }
-
- // -----------------------------------------------------------------------------------------...
-
- private static final int DEFAULT_PARALLELISM = 4;
-
- protected static ForkableFlinkMiniCluster cluster = null;
-
- protected transient TestExecutionMode mode;
-
- public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
- this.mode = mode;
- }
-
- @BeforeClass
- public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
- }
-
- @AfterClass
- public static void teardown() throws Exception {
- stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c8b79ef..a5a7da9 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
@@ -36,15 +37,16 @@ public abstract class AbstractTestBase extends TestBaseUtils {
protected final Configuration config;
- protected ForkableFlinkMiniCluster executor;
-
private final List<File> tempFiles;
+ private final FiniteDuration timeout;
+
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
-
- private final FiniteDuration timeout;
+
+ protected ForkableFlinkMiniCluster executor;
+
public AbstractTestBase(Configuration config) {
this.config = config;
@@ -57,8 +59,8 @@ public abstract class AbstractTestBase extends TestBaseUtils {
// Local Test Cluster Life Cycle
// --------------------------------------------------------------------------------------------
- public void startCluster() throws Exception{
- this.executor = startCluster(numTaskManagers, taskManagerNumSlots, false);
+ public void startCluster() throws Exception {
+ this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false);
}
public void stopCluster() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index e0c4360..8dab485 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.util;
+import org.apache.flink.runtime.StreamingMode;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
import java.util.Arrays;
@@ -51,11 +54,36 @@ import java.util.Collection;
*
* }</pre>
*/
-public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+public class MultipleProgramsTestBase extends TestBaseUtils {
+ /**
+ * Enum that defines which execution environment to run the next test on:
+ * An embedded local flink cluster, or the collection execution backend.
+ */
+ public enum TestExecutionMode {
+ CLUSTER,
+ COLLECTION
+ }
+
+ // ------------------------------------------------------------------------
+ // The mini cluster that is shared across tests
+ // ------------------------------------------------------------------------
+
+ protected static final int DEFAULT_PARALLELISM = 4;
+
+ protected static boolean startWebServer = false;
+
+ protected static ForkableFlinkMiniCluster cluster = null;
+
+ // ------------------------------------------------------------------------
+
+ protected final TestExecutionMode mode;
+
+
public MultipleProgramsTestBase(TestExecutionMode mode){
- super(mode);
- switch(this.mode){
+ this.mode = mode;
+
+ switch(mode){
case CLUSTER:
TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
clusterEnv.setAsContext();
@@ -67,6 +95,24 @@ public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
}
}
+ // ------------------------------------------------------------------------
+ // Cluster setup & teardown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() throws Exception{
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer);
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ }
+
+ // ------------------------------------------------------------------------
+ // Parametrization lets the tests run in cluster and collection mode
+ // ------------------------------------------------------------------------
+
@Parameterized.Parameters(name = "Execution mode = {0}")
public static Collection<TestExecutionMode[]> executionModes(){
return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b0e4ae9..29cb2a4 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.hadoop.fs.FileSystem;
@@ -79,46 +80,55 @@ public class TestBaseUtils {
protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
- protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration
- (DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+ protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+ // ------------------------------------------------------------------------
+
protected static File logDir;
protected TestBaseUtils(){
verifyJvmOptions();
}
-
- private void verifyJvmOptions() {
+
+ private static void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
-
- protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int
- taskManagerNumSlots, boolean startWebserver) throws Exception {
+
+
+ protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
+ int taskManagerNumSlots,
+ StreamingMode mode,
+ boolean startWebserver) throws Exception {
+
logDir = File.createTempFile("TestBaseUtils-logdir", null);
Assert.assertTrue("Unable to delete temp file", logDir.delete());
Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
+
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
- config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
+
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
- config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
+
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, startWebserver);
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
- return new ForkableFlinkMiniCluster(config);
+
+ return new ForkableFlinkMiniCluster(config, true, mode);
}
- protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout)
- throws Exception {
- if(logDir != null) {
- logDir.delete();
+ protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+ if (logDir != null) {
+ FileUtils.deleteDirectory(logDir);
}
- if(executor != null) {
+ if (executor != null) {
int numUnreleasedBCVars = 0;
int numActiveConnections = 0;
{
@@ -169,11 +179,11 @@ public class TestBaseUtils {
// Result Checking
// --------------------------------------------------------------------------------------------
- public BufferedReader[] getResultReader(String resultPath) throws IOException {
+ public static BufferedReader[] getResultReader(String resultPath) throws IOException {
return getResultReader(resultPath, new String[]{}, false);
}
- public BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
+ public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException {
File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
@@ -206,12 +216,11 @@ public class TestBaseUtils {
- public BufferedInputStream[] getResultInputStream(String resultPath) throws
- IOException {
+ public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
return getResultInputStream(resultPath, new String[]{});
}
- public BufferedInputStream[] getResultInputStream(String resultPath, String[]
+ public static BufferedInputStream[] getResultInputStream(String resultPath, String[]
excludePrefixes) throws IOException {
File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
@@ -221,37 +230,37 @@ public class TestBaseUtils {
return inStreams;
}
- public void readAllResultLines(List<String> target, String resultPath) throws
- IOException {
+ public static void readAllResultLines(List<String> target, String resultPath) throws IOException {
readAllResultLines(target, resultPath, new String[]{});
}
- public void readAllResultLines(List<String> target, String resultPath, String[]
- excludePrefixes) throws IOException {
+ public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes)
+ throws IOException {
+
readAllResultLines(target, resultPath, excludePrefixes, false);
}
- public void readAllResultLines(List<String> target, String resultPath, String[]
+ public static void readAllResultLines(List<String> target, String resultPath, String[]
excludePrefixes, boolean inOrderOfFiles) throws IOException {
for (BufferedReader reader : getResultReader(resultPath, excludePrefixes, inOrderOfFiles)) {
- String s = null;
+ String s;
while ((s = reader.readLine()) != null) {
target.add(s);
}
}
}
- public void compareResultsByLinesInMemory(String expectedResultStr, String
+ public static void compareResultsByLinesInMemory(String expectedResultStr, String
resultPath) throws Exception {
compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[]{});
}
- public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
+ public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, false);
- String[] result = (String[]) list.toArray(new String[list.size()]);
+ String[] result = list.toArray(new String[list.size()]);
Arrays.sort(result);
String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
@@ -261,18 +270,18 @@ public class TestBaseUtils {
Assert.assertArrayEquals(expected, result);
}
- public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+ public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
String resultPath) throws
Exception {
compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
}
- public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+ public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
String resultPath, String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, true);
- String[] result = (String[]) list.toArray(new String[list.size()]);
+ String[] result = list.toArray(new String[list.size()]);
String[] expected = expectedResultStr.split("\n");
@@ -280,7 +289,7 @@ public class TestBaseUtils {
Assert.assertArrayEquals(expected, result);
}
- public void checkLinesAgainstRegexp(String resultPath, String regexp){
+ public static void checkLinesAgainstRegexp(String resultPath, String regexp){
Pattern pattern = Pattern.compile(regexp);
Matcher matcher = pattern.matcher("");
@@ -301,17 +310,17 @@ public class TestBaseUtils {
}
- public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+ public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
String delimiter, double maxDelta) throws Exception {
compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
}
- public void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
+ public static void compareKeyValueParisWithDelta(String expectedLines, String resultPath,
String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, excludePrefixes, false);
- String[] result = (String[]) list.toArray(new String[list.size()]);
+ String[] result = list.toArray(new String[list.size()]);
String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
Assert.assertEquals("Wrong number of result lines.", expected.length, result.length);
@@ -330,7 +339,7 @@ public class TestBaseUtils {
}
}
- public <X> void compareResultCollections(List<X> expected, List<X> actual,
+ public static <X> void compareResultCollections(List<X> expected, List<X> actual,
Comparator<X> comparator) {
Assert.assertEquals(expected.size(), actual.size());
@@ -445,8 +454,8 @@ public class TestBaseUtils {
protected static void deleteRecursively(File f) throws IOException {
if (f.isDirectory()) {
FileUtils.deleteDirectory(f);
- } else {
- f.delete();
+ } else if (!f.delete()) {
+ System.err.println("Failed to delete file " + f.getAbsolutePath());
}
}
@@ -469,13 +478,13 @@ public class TestBaseUtils {
// Web utils
//---------------------------------------------------------------------------------------------
- public static String getFromHTTP(String url) throws Exception{
+ public static String getFromHTTP(String url) throws Exception {
URL u = new URL(url);
LOG.info("Accessing URL "+url+" as URL: "+u);
HttpURLConnection connection = (HttpURLConnection) u.openConnection();
connection.setConnectTimeout(100000);
connection.connect();
- InputStream is = null;
+ InputStream is;
if(connection.getResponseCode() >= 400) {
// error!
LOG.warn("HTTP Response code when connecting to {} was {}", url, connection.getResponseCode());
@@ -486,5 +495,4 @@ public class TestBaseUtils {
return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04377224/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
index 2e664c1..3ea8624 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.flink.test.util
+import org.apache.flink.runtime.StreamingMode
import org.scalatest.{Suite, BeforeAndAfter}
/** Mixin to start and stop a ForkableFlinkMiniCluster automatically for Scala based tests.
@@ -46,17 +47,16 @@ import org.scalatest.{Suite, BeforeAndAfter}
* }}}
*
*/
-trait FlinkTestBase
- extends BeforeAndAfter {
+trait FlinkTestBase extends BeforeAndAfter {
that: Suite =>
var cluster: Option[ForkableFlinkMiniCluster] = None
val parallelism = 4
before {
- val cl = TestBaseUtils.startCluster(1, parallelism, false)
+ val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false)
val clusterEnvironment = new TestEnvironment(cl, parallelism)
- clusterEnvironment.setAsContext
+ clusterEnvironment.setAsContext()
cluster = Some(cl)
}