You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:36 UTC
[14/50] [abbrv] flink git commit: [FLINK-4458] Replace
ForkableFlinkMiniCluster by LocalFlinkMiniCluster
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 9bd8cc3..2738d22 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.runtime.leaderelection;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.PoisonPill;
-import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -35,14 +35,19 @@ import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
@@ -52,7 +57,6 @@ import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
import java.io.File;
-import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -61,22 +65,20 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION();
- private static final File tempDirectory;
+ private static TestingServer zkServer;
- static {
- try {
- tempDirectory = org.apache.flink.runtime.testutils
- .CommonTestUtils.createTempDirectory();
- }
- catch (IOException e) {
- throw new RuntimeException("Test setup failed", e);
- }
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ zkServer = new TestingServer(true);
}
@AfterClass
public static void tearDown() throws Exception {
- if (tempDirectory != null) {
- FileUtils.deleteDirectory(tempDirectory);
+ if (zkServer != null) {
+ zkServer.close();
}
}
@@ -86,18 +88,19 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
*/
@Test
public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
- Configuration configuration = new Configuration();
+ File rootFolder = tempFolder.getRoot();
+
+ Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+ zkServer.getConnectString(),
+ rootFolder.getPath());
int numJMs = 10;
int numTMs = 3;
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
- ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+ TestingCluster cluster = new TestingCluster(configuration);
try {
cluster.start();
@@ -137,14 +140,15 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
int numSlotsPerTM = 3;
int parallelism = numTMs * numSlotsPerTM;
- Configuration configuration = new Configuration();
+ File rootFolder = tempFolder.getRoot();
+
+ Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+ zkServer.getConnectString(),
+ rootFolder.getPath());
- configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
- configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
@@ -169,7 +173,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
final JobGraph graph = new JobGraph("Blocking test job", sender, receiver);
- final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+ final TestingCluster cluster = new TestingCluster(configuration);
ActorSystem clientActorSystem = null;
@@ -250,14 +254,14 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
boolean finished = false;
final ActorSystem clientActorSystem;
- final ForkableFlinkMiniCluster cluster;
+ final LocalFlinkMiniCluster cluster;
final JobGraph graph;
final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise<>();
public JobSubmitterRunnable(
ActorSystem actorSystem,
- ForkableFlinkMiniCluster cluster,
+ LocalFlinkMiniCluster cluster,
JobGraph graph) {
this.clientActorSystem = actorSystem;
this.cluster = cluster;
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index d693aaa..2ed759d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestLogger;
@@ -75,7 +75,7 @@ public class TimestampITCase extends TestLogger {
static MultiShotLatch latch;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
@Before
public void setupLatch() {
@@ -92,7 +92,7 @@ public class TimestampITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index fc90994..a8482ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -29,11 +29,11 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
@@ -62,7 +62,7 @@ public class WebFrontendITCase extends TestLogger {
private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_SLOTS = 4;
- private static ForkableFlinkMiniCluster cluster;
+ private static LocalFlinkMiniCluster cluster;
private static int port = -1;
@@ -86,7 +86,7 @@ public class WebFrontendITCase extends TestLogger {
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
- cluster = new ForkableFlinkMiniCluster(config, false);
+ cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
port = cluster.webMonitor().get().getServerPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index ac661f3..1b2838d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.scala.runtime.jobmanager
import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
@@ -30,8 +29,7 @@ import org.apache.flink.runtime.messages.Messages.Acknowledge
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated}
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -140,12 +138,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
}
}
- def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+ def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
- val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+ val cluster = new TestingCluster(config, singleActorSystem = false)
cluster.start()
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 258f6df..3b39b3f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.api.scala.runtime.taskmanager
import akka.actor.{ActorSystem, Kill, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -31,8 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -100,7 +98,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val jobID = jobGraph.getJobID
- val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+ val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
@@ -152,7 +150,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val jobID = jobGraph.getJobID
- val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+ val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
val taskManagers = cluster.getTaskManagers
val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
@@ -239,11 +237,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
}
}
- def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+ def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
- new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+ new TestingCluster(config, singleActorSystem = false)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 8c211ef..ffdca36 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -48,6 +48,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
<!-- Needed for the streaming wordcount example -->
<dependency>
<groupId>org.apache.flink</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 0243012..31a3d98 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -48,7 +48,6 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/tools/maven/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml
index f7bb0d4..0f7f6bb 100644
--- a/tools/maven/scalastyle-config.xml
+++ b/tools/maven/scalastyle-config.xml
@@ -86,7 +86,7 @@
<!-- </check> -->
<check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
- <parameter name="maxParameters"><![CDATA[10]]></parameter>
+ <parameter name="maxParameters"><![CDATA[15]]></parameter>
</parameters>
</check>
<!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->