You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:17:47 UTC
[2/4] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster
by LocalFlinkMiniCluster
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 934a795..6abea2a 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,11 +27,11 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +92,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
- ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+ LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();
final int flinkPort = flink.getLeaderRPCPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index ee415d1..29b3a3e 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -18,8 +18,9 @@
package org.apache.flink.streaming.api.scala
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitSuiteLike
@@ -29,7 +30,7 @@ trait ScalaStreamingMultipleProgramsTestBase
with BeforeAndAfterAll {
val parallelism = 4
- var cluster: Option[ForkableFlinkMiniCluster] = None
+ var cluster: Option[LocalFlinkMiniCluster] = None
override protected def beforeAll(): Unit = {
val cluster = Some(
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 2ab52b5..18ecfde 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -79,153 +79,4 @@ under the License.
</dependency>
</dependencies>
-
- <build>
- <plugins>
- <!-- Scala Compiler -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.4</version>
- <executions>
- <!-- Run scala compiler in the process-resources phase, so that dependencies
- on scala classes can be resolved later in the (Java) compile phase -->
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
-
- <!-- Run scala compiler in the process-test-resources phase, so that
- dependencies on scala classes can be resolved later in the (Java) test-compile
- phase -->
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms128m</jvmArg>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <projectnatures>
- <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
- <projectnature>org.eclipse.jdt.core.javanature</projectnature>
- </projectnatures>
- <buildcommands>
- <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
- </buildcommands>
- <classpathContainers>
- <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <excludes>
- <exclude>org.scala-lang:scala-library</exclude>
- <exclude>org.scala-lang:scala-compiler</exclude>
- </excludes>
- <sourceIncludes>
- <sourceInclude>**/*.scala</sourceInclude>
- <sourceInclude>**/*.java</sourceInclude>
- </sourceIncludes>
- </configuration>
- </plugin>
-
- <!-- Adding scala source directories to build path -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <!-- Add src/main/scala to eclipse build path -->
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <!-- Add src/test/scala to eclipse build path -->
- <execution>
- <id>add-test-source</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Scala Code Style, most of the configuration done via plugin management -->
- <plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <configuration>
- <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
- </configuration>
- </plugin>
-
- </plugins>
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- net.alchim31.maven
- </groupId>
- <artifactId>
- scala-maven-plugin
- </artifactId>
- <versionRange>
- [3.1.4,)
- </versionRange>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index c5fbaf0..a478908 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -19,8 +19,8 @@
package org.apache.flink.streaming.util;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.AfterClass;
@@ -61,7 +61,7 @@ public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
protected static final int DEFAULT_PARALLELISM = 4;
- protected static ForkableFlinkMiniCluster cluster;
+ protected static LocalFlinkMiniCluster cluster;
public StreamingMultipleProgramsTestBase() {
super(new Configuration());
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index c700102..64c68dc 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Preconditions;
/**
@@ -32,10 +32,10 @@ import org.apache.flink.util.Preconditions;
public class TestStreamEnvironment extends StreamExecutionEnvironment {
/** The mini cluster in which this environment executes its jobs */
- private ForkableFlinkMiniCluster executor;
+ private LocalFlinkMiniCluster executor;
- public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+ public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
this.executor = Preconditions.checkNotNull(executor);
setParallelism(parallelism);
}
@@ -57,7 +57,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
* @param cluster The test cluster to run the test program on.
* @param parallelism The default parallelism for the test programs.
*/
- public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
+ public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c2da691..316fd21 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
@@ -48,7 +49,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
protected int numTaskManagers = 1;
/** The mini cluster that runs the test programs */
- protected ForkableFlinkMiniCluster executor;
+ protected LocalFlinkMiniCluster executor;
public AbstractTestBase(Configuration config) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index d7f09bd..4e83245 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.util;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
@@ -72,7 +73,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
protected static boolean startWebServer = false;
- protected static ForkableFlinkMiniCluster cluster = null;
+ protected static LocalFlinkMiniCluster cluster = null;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 4014b80..b774f97 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,7 +32,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.FileSystem;
@@ -104,7 +105,7 @@ public class TestBaseUtils extends TestLogger {
}
- public static ForkableFlinkMiniCluster startCluster(
+ public static LocalFlinkMiniCluster startCluster(
int numTaskManagers,
int taskManagerNumSlots,
boolean startWebserver,
@@ -126,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
return startCluster(config, singleActorSystem);
}
- public static ForkableFlinkMiniCluster startCluster(
+ public static LocalFlinkMiniCluster startCluster(
Configuration config,
boolean singleActorSystem) throws Exception {
@@ -147,7 +148,7 @@ public class TestBaseUtils extends TestLogger {
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
- ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem);
+ LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, singleActorSystem);
cluster.start();
@@ -155,7 +156,7 @@ public class TestBaseUtils extends TestLogger {
}
- public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+ public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
if (logDir != null) {
FileUtils.deleteDirectory(logDir);
}
@@ -169,11 +170,15 @@ public class TestBaseUtils extends TestLogger {
List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>();
for (ActorRef tm : tms) {
- bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
- .RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
-
- numActiveConnectionsResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
- .RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
+ bcVariableManagerResponseFutures.add(Patterns.ask(
+ tm,
+ TaskManagerMessages.getRequestBroadcastVariablesWithReferences(),
+ new Timeout(timeout)));
+
+ numActiveConnectionsResponseFutures.add(Patterns.ask(
+ tm,
+ TaskManagerMessages.getRequestNumActiveConnections(),
+ new Timeout(timeout)));
}
Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
@@ -182,8 +187,7 @@ public class TestBaseUtils extends TestLogger {
Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);
for (Object response : responses) {
- numUnreleasedBCVars += ((TestingTaskManagerMessages
- .ResponseBroadcastVariablesWithReferences) response).number();
+ numUnreleasedBCVars += ((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) response).number();
}
Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
@@ -192,8 +196,7 @@ public class TestBaseUtils extends TestLogger {
responses = Await.result(numActiveConnectionsFutureResponses, timeout);
for (Object response : responses) {
- numActiveConnections += ((TestingTaskManagerMessages
- .ResponseNumActiveConnections) response).number();
+ numActiveConnections += ((TaskManagerMessages.ResponseNumActiveConnections) response).number();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 7cb88be..aea8152 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -29,10 +29,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
public class TestEnvironment extends ExecutionEnvironment {
- private final ForkableFlinkMiniCluster executor;
+ private final LocalFlinkMiniCluster executor;
private TestEnvironment lastEnv = null;
@@ -46,7 +47,7 @@ public class TestEnvironment extends ExecutionEnvironment {
}
}
- public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+ public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
this.executor = executor;
setParallelism(parallelism);
@@ -54,7 +55,7 @@ public class TestEnvironment extends ExecutionEnvironment {
getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
}
- public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
+ public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
this(executor, parallelism);
if (isObjectReuseEnabled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
deleted file mode 100644
index fa3135a..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.Patterns._
-import akka.pattern.ask
-
-import org.apache.curator.test.TestingCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager}
-import org.apache.flink.runtime.testutils.TestingResourceManager
-
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
-
-/**
- * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
- * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and
- * uses it to avoid port conflicts.
- *
- * @param userConfiguration Configuration object with the user provided configuration values
- * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
- * same [[ActorSystem]], otherwise false.
- */
-class ForkableFlinkMiniCluster(
- userConfiguration: Configuration,
- singleActorSystem: Boolean)
- extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
-
- def this(userConfiguration: Configuration) = this(userConfiguration, true)
-
- // --------------------------------------------------------------------------
-
- var zookeeperCluster: Option[TestingCluster] = None
-
- override def generateConfiguration(userConfiguration: Configuration): Configuration = {
- val forkNumberString = System.getProperty("forkNumber")
-
- val forkNumber = try {
- Integer.parseInt(forkNumberString)
- }
- catch {
- case e: NumberFormatException => -1
- }
-
- val config = userConfiguration.clone()
-
- if (forkNumber != -1) {
- val jobManagerRPC = 1024 + forkNumber*400
- val taskManagerRPC = 1024 + forkNumber*400 + 100
- val taskManagerData = 1024 + forkNumber*400 + 200
- val resourceManagerRPC = 1024 + forkNumber*400 + 300
-
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
- config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
- config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
- config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC)
- }
-
- super.generateConfiguration(config)
- }
-
- override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val jobManagerName = getJobManagerName(index)
- val archiveName = getArchiveName(index)
-
- val jobManagerPort = config.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
- if (jobManagerPort > 0) {
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
- }
-
- val (jobManager, _) = JobManager.startJobManagerActors(
- config,
- actorSystem,
- Some(jobManagerName),
- Some(archiveName),
- classOf[TestingJobManager],
- classOf[TestingMemoryArchivist])
-
- jobManager
- }
-
- override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val resourceManagerName = getResourceManagerName(index)
-
- val resourceManagerPort = config.getInteger(
- ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
- if (resourceManagerPort > 0) {
- config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
- }
-
- val resourceManager = FlinkResourceManager.startResourceManagerActors(
- config,
- system,
- createLeaderRetrievalService(),
- classOf[TestingResourceManager],
- resourceManagerName)
-
- resourceManager
- }
-
- override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
- val config = configuration.clone()
-
- val rpcPort = config.getInteger(
- ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
-
- val dataPort = config.getInteger(
- ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
- if (rpcPort > 0) {
- config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
- }
- if (dataPort > 0) {
- config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
- }
-
- val localExecution = numTaskManagers == 1
-
- TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- system,
- hostname,
- Some(TaskManager.TASK_MANAGER_NAME + index),
- Some(createLeaderRetrievalService()),
- localExecution,
- classOf[TestingTaskManager])
- }
-
- def addTaskManager(): Unit = {
- if (useSingleActorSystem) {
- (jobManagerActorSystems, taskManagerActors) match {
- case (Some(jmSystems), Some(tmActors)) =>
- val index = numTaskManagers
- taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0)))
- numTaskManagers += 1
- case _ => throw new IllegalStateException("Cluster has not been started properly.")
- }
- } else {
- (taskManagerActorSystems, taskManagerActors) match {
- case (Some(tmSystems), Some(tmActors)) =>
- val index = numTaskManagers
- val newTmSystem = startTaskManagerActorSystem(index)
- val newTmActor = startTaskManager(index, newTmSystem)
-
- taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
- taskManagerActors = Some(tmActors :+ newTmActor)
-
- numTaskManagers += 1
- case _ => throw new IllegalStateException("Cluster has not been started properly.")
- }
- }
- }
-
- def restartLeadingJobManager(): Unit = {
- this.synchronized {
- (jobManagerActorSystems, jobManagerActors) match {
- case (Some(jmActorSystems), Some(jmActors)) =>
- val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
- val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
-
- clearLeader()
-
- val stopped = gracefulStop(leader.actor(), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
- Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
- if(!singleActorSystem) {
- jmActorSystems(index).shutdown()
- jmActorSystems(index).awaitTermination()
- }
-
- val newJobManagerActorSystem = if(!singleActorSystem) {
- startJobManagerActorSystem(index)
- } else {
- jmActorSystems.head
- }
-
- val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
-
- jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
- jobManagerActorSystems = Some(jmActorSystems.patch(
- index,
- Seq(newJobManagerActorSystem),
- 1))
-
- val lrs = createLeaderRetrievalService()
-
- jobManagerLeaderRetrievalService = Some(lrs)
- lrs.start(this)
-
- case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " +
- "been started properly.")
- }
- }
- }
-
-
- def restartTaskManager(index: Int): Unit = {
- (taskManagerActorSystems, taskManagerActors) match {
- case (Some(tmActorSystems), Some(tmActors)) =>
- val stopped = gracefulStop(tmActors(index), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
- Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
- if(!singleActorSystem) {
- tmActorSystems(index).shutdown()
- tmActorSystems(index).awaitTermination()
- }
-
- val taskManagerActorSystem = if(!singleActorSystem) {
- startTaskManagerActorSystem(index)
- } else {
- tmActorSystems.head
- }
-
- val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
-
- taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
- taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
-
- case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " +
- "been started properly.")
- }
- }
-
- override def start(): Unit = {
- val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
-
- zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER &&
- zookeeperURL.equals("")) {
- LOG.info("Starting ZooKeeper cluster.")
-
- val testingCluster = new TestingCluster(1)
-
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
- testingCluster.getConnectString)
-
- testingCluster.start()
-
- Some(testingCluster)
- } else {
- None
- }
-
- super.start()
- }
-
- override def stop(): Unit = {
- super.stop()
-
- zookeeperCluster.foreach{
- LOG.info("Stopping ZooKeeper cluster.")
- _.close()
- }
- }
-
- def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
- val futures = taskManagerActors.map {
- _.map {
- tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
- }
- }.getOrElse(Seq())
-
- try {
- Await.ready(Future.sequence(futures), timeout)
- } catch {
- case t: TimeoutException =>
- throw new Exception("Timeout while waiting for TaskManagers to register at " +
- s"${jobManager.path}")
- }
-
- }
-}
-
-object ForkableFlinkMiniCluster {
-
- val MAX_RESTART_DURATION = 2 minute
-
- val DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT = "200 s"
-
- def startCluster(
- numSlots: Int,
- numTaskManagers: Int,
- timeout: String = DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT)
- : ForkableFlinkMiniCluster = {
-
- val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
-
- val cluster = new ForkableFlinkMiniCluster(config)
-
- cluster.start()
-
- cluster
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index cac8451..cc70fee 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -44,7 +44,7 @@ import static org.junit.Assert.fail;
*/
public class AccumulatorErrorITCase {
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -53,7 +53,7 @@ public class AccumulatorErrorITCase {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 49e18e0..624bfff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -234,7 +234,6 @@ public class AccumulatorLiveITCase {
fail("Wrong accumulator results when map task begins execution.");
}
-
int expectedAccVal = 0;
/* for mapper task */
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 9671fce..8a08f15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.cancelling;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning;
import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
@@ -65,7 +64,7 @@ public abstract class CancelingTestBase extends TestLogger {
// --------------------------------------------------------------------------------------------
- protected ForkableFlinkMiniCluster executor;
+ protected LocalFlinkMiniCluster executor;
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
@@ -88,7 +87,7 @@ public abstract class CancelingTestBase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
- this.executor = new ForkableFlinkMiniCluster(config, false);
+ this.executor = new LocalFlinkMiniCluster(config, false);
this.executor.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 163fb42..94ff66f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -35,7 +36,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -60,7 +60,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -71,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index fa5339d..0aee128 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -41,7 +42,6 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -76,7 +76,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -95,7 +95,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8915bff..7f1d7f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -73,7 +73,7 @@ public class RescalingITCase extends TestLogger {
private static int slotsPerTaskManager = 2;
private static int numSlots = numTaskManagers * slotsPerTaskManager;
- private static ForkableFlinkMiniCluster cluster;
+ private static TestingCluster cluster;
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -92,7 +92,7 @@ public class RescalingITCase extends TestLogger {
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
- cluster = new ForkableFlinkMiniCluster(config);
+ cluster = new TestingCluster(config);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 550ba75..7409fe7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointListener;
@@ -51,6 +50,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
@@ -62,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
@@ -76,7 +74,6 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -137,7 +134,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Created temporary directory: " + tmpDir + ".");
- ForkableFlinkMiniCluster flink = null;
+ TestingCluster flink = null;
try {
// Create a test actor system
@@ -168,7 +165,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- flink = new ForkableFlinkMiniCluster(config);
+ flink = new TestingCluster(config);
LOG.info("Starting Flink cluster.");
flink.start();
@@ -261,7 +258,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("JobManager: " + jobManager + ".");
final Throwable[] error = new Throwable[1];
- final ForkableFlinkMiniCluster finalFlink = flink;
+ final TestingCluster finalFlink = flink;
final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
new JavaTestKit(testActorSystem) {{
@@ -422,7 +419,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Created temporary directory: " + tmpDir + ".");
- ForkableFlinkMiniCluster flink = null;
+ TestingCluster flink = null;
List<File> checkpointFiles = new ArrayList<>();
try {
@@ -447,7 +444,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- flink = new ForkableFlinkMiniCluster(config);
+ flink = new TestingCluster(config);
LOG.info("Starting Flink cluster.");
flink.start();
@@ -559,7 +556,7 @@ public class SavepointITCase extends TestLogger {
// Test deadline
final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- ForkableFlinkMiniCluster flink = null;
+ TestingCluster flink = null;
try {
// Flink configuration
@@ -570,7 +567,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Flink configuration: " + config + ".");
// Start Flink
- flink = new ForkableFlinkMiniCluster(config);
+ flink = new TestingCluster(config);
LOG.info("Starting Flink cluster.");
flink.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index cf15052..6bf511f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -80,7 +80,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
private static final int NUM_TASK_SLOTS = 3;
private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -91,7 +91,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 67c05e5..5f6cd4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TestLogger;
@@ -43,7 +43,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
protected static final int NUM_TASK_SLOTS = 4;
protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -53,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 2e6ce78..e424a8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -71,7 +71,7 @@ public class WindowCheckpointingITCase extends TestLogger {
private static final int PARALLELISM = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
@@ -81,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 8b56d3d..7afafe4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.classloading;
-import akka.pattern.AskTimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -37,9 +36,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -82,7 +81,7 @@ public class ClassLoaderITCase extends TestLogger {
public static final TemporaryFolder FOLDER = new TemporaryFolder();
- private static ForkableFlinkMiniCluster testCluster;
+ private static TestingCluster testCluster;
private static int parallelism;
@@ -105,7 +104,7 @@ public class ClassLoaderITCase extends TestLogger {
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
- testCluster = new ForkableFlinkMiniCluster(config, false);
+ testCluster = new TestingCluster(config, false);
testCluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index c9059f1..a74ed34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -42,7 +42,6 @@ import java.util.concurrent.Semaphore;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
-
/**
* Tests retrieval of a job from a running Flink cluster
*/
@@ -54,7 +53,7 @@ public class JobRetrievalITCase extends TestLogger {
@BeforeClass
public static void before() {
- cluster = new ForkableFlinkMiniCluster(new Configuration(), false);
+ cluster = new TestingCluster(new Configuration(), false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 28c2e58..178656d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public class JobSubmissionFailsITCase {
private static final int NUM_SLOTS = 20;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
private static JobGraph workingJobGraph;
@BeforeClass
@@ -58,7 +58,7 @@ public class JobSubmissionFailsITCase {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
- cluster = new ForkableFlinkMiniCluster(config);
+ cluster = new LocalFlinkMiniCluster(config);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index ca2c156..133ebd0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
@@ -52,7 +52,7 @@ public class CustomDistributionITCase extends TestLogger {
// The mini cluster that is shared across tests
// ------------------------------------------------------------------------
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void setup() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 34a7eed..e18e82a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,10 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -54,7 +53,7 @@ public class RemoteEnvironmentITCase {
private static final String VALID_STARTUP_TIMEOUT = "100 s";
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void setupCluster() {
@@ -62,7 +61,7 @@ public class RemoteEnvironmentITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 09b5e7e..a67e6ef 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -49,14 +49,14 @@ public class AutoParallelismITCase {
private static final int SLOTS_PER_TM = 7;
private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void setupCluster() {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index f30f61f..51f3534 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.types.Value;
import org.junit.AfterClass;
@@ -43,7 +43,7 @@ public class CustomSerializationITCase {
private static final int PARLLELISM = 5;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -51,7 +51,7 @@ public class CustomSerializationITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 42419fb..06b93ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Collector;
@@ -52,7 +52,7 @@ import static org.junit.Assert.*;
@SuppressWarnings("serial")
public class MiscellaneousIssuesITCase {
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
@@ -61,7 +61,7 @@ public class MiscellaneousIssuesITCase {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 12b7a68..a43bab6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,8 +32,8 @@ import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -43,7 +43,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
@Test
public void testSuccessfulProgramAfterFailure() {
- ForkableFlinkMiniCluster cluster = null;
+ LocalFlinkMiniCluster cluster = null;
try {
Configuration config = new Configuration();
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 40732df..b99858a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks;
@@ -62,7 +63,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -108,7 +108,7 @@ public class QueryableStateITCase extends TestLogger {
* Shared between all the test. Make sure to have at least NUM_SLOTS
* available after your test finishes, e.g. cancel the job you submitted.
*/
- private static ForkableFlinkMiniCluster cluster;
+ private static TestingCluster cluster;
@BeforeClass
public static void setup() {
@@ -120,7 +120,7 @@ public class QueryableStateITCase extends TestLogger {
config.setInteger(ConfigConstants.QUERYABLE_STATE_CLIENT_NETWORK_THREADS, 1);
config.setInteger(ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, 1);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new TestingCluster(config, false);
cluster.start(true);
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index 8a45d62..8a43ee4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -49,7 +49,7 @@ public class FastFailuresITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+ LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index 0c5d14b..a0d6b58 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.BeforeClass;
public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -34,8 +34,8 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCas
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second");
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 6355a8f..f09efc5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.BeforeClass;
public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
@@ -33,8 +33,8 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecover
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
index 004340c..bf7c524 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.AfterClass;
import org.junit.Test;
@@ -42,7 +42,7 @@ import static org.junit.Assert.*;
@SuppressWarnings("serial")
public abstract class SimpleRecoveryITCaseBase {
- protected static ForkableFlinkMiniCluster cluster;
+ protected static LocalFlinkMiniCluster cluster;
@AfterClass
public static void teardownCluster() {
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 6c621ac..5d29905 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -65,7 +65,7 @@ public class TaskManagerFailureRecoveryITCase {
final int PARALLELISM = 4;
- ForkableFlinkMiniCluster cluster = null;
+ LocalFlinkMiniCluster cluster = null;
ActorSystem additionalSystem = null;
try {
@@ -78,7 +78,7 @@ public class TaskManagerFailureRecoveryITCase {
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 7710f06..0b008eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
@@ -63,7 +63,7 @@ public class IPv6HostnamesITCase extends TestLogger {
- ForkableFlinkMiniCluster flink = null;
+ LocalFlinkMiniCluster flink = null;
try {
final String addressString = ipv6address.getHostAddress();
log.info("Test will use IPv6 address " + addressString + " for connection tests");
@@ -75,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger {
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
- flink = new ForkableFlinkMiniCluster(conf, false);
+ flink = new LocalFlinkMiniCluster(conf, false);
flink.start();
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());