You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:36 UTC

[14/50] [abbrv] flink git commit: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 9bd8cc3..2738d22 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.runtime.leaderelection;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
-import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -35,14 +35,19 @@ import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -52,7 +57,6 @@ import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
 import java.io.File;
-import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -61,22 +65,20 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 	private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION();
 
-	private static final File tempDirectory;
+	private static TestingServer zkServer;
 
-	static {
-		try {
-			tempDirectory = org.apache.flink.runtime.testutils
-					.CommonTestUtils.createTempDirectory();
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Test setup failed", e);
-		}
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		zkServer = new TestingServer(true);
 	}
 
 	@AfterClass
 	public static void tearDown() throws Exception {
-		if (tempDirectory != null) {
-			FileUtils.deleteDirectory(tempDirectory);
+		if (zkServer != null) {
+			zkServer.close();
 		}
 	}
 
@@ -86,18 +88,19 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 	 */
 	@Test
 	public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
-		Configuration configuration = new Configuration();
+		File rootFolder = tempFolder.getRoot();
+
+		Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zkServer.getConnectString(),
+			rootFolder.getPath());
 
 		int numJMs = 10;
 		int numTMs = 3;
 
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
-		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+		TestingCluster cluster = new TestingCluster(configuration);
 
 		try {
 			cluster.start();
@@ -137,14 +140,15 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		int numSlotsPerTM = 3;
 		int parallelism = numTMs * numSlotsPerTM;
 
-		Configuration configuration = new Configuration();
+		File rootFolder = tempFolder.getRoot();
+
+		Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zkServer.getConnectString(),
+			rootFolder.getPath());
 
-		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
-		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
 		// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
@@ -169,7 +173,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		final JobGraph graph = new JobGraph("Blocking test job", sender, receiver);
 
-		final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+		final TestingCluster cluster = new TestingCluster(configuration);
 
 		ActorSystem clientActorSystem = null;
 
@@ -250,14 +254,14 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		boolean finished = false;
 
 		final ActorSystem clientActorSystem;
-		final ForkableFlinkMiniCluster cluster;
+		final LocalFlinkMiniCluster cluster;
 		final JobGraph graph;
 
 		final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise<>();
 
 		public JobSubmitterRunnable(
 				ActorSystem actorSystem,
-				ForkableFlinkMiniCluster cluster,
+				LocalFlinkMiniCluster cluster,
 				JobGraph graph) {
 			this.clientActorSystem = actorSystem;
 			this.cluster = cluster;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index d693aaa..2ed759d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.util.TestLogger;
@@ -75,7 +75,7 @@ public class TimestampITCase extends TestLogger {
 	static MultiShotLatch latch;
 
 
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	@Before
 	public void setupLatch() {
@@ -92,7 +92,7 @@ public class TimestampITCase extends TestLogger {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 
-			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index fc90994..a8482ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -29,11 +29,11 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.apache.flink.util.TestLogger;
@@ -62,7 +62,7 @@ public class WebFrontendITCase extends TestLogger {
 	private static final int NUM_TASK_MANAGERS = 2;
 	private static final int NUM_SLOTS = 4;
 	
-	private static ForkableFlinkMiniCluster cluster;
+	private static LocalFlinkMiniCluster cluster;
 
 	private static int port = -1;
 	
@@ -86,7 +86,7 @@ public class WebFrontendITCase extends TestLogger {
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
 
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();
 		
 		port = cluster.webMonitor().get().getServerPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index ac661f3..1b2838d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
@@ -30,8 +29,7 @@ import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated}
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -140,12 +138,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
     }
   }
 
-  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
 
-    val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+    val cluster = new TestingCluster(config, singleActorSystem = false)
 
     cluster.start()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 258f6df..3b39b3f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -31,8 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -100,7 +98,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
-      val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
 
       val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
@@ -152,7 +150,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
-      val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
 
       val taskManagers = cluster.getTaskManagers
       val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
@@ -239,11 +237,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
     }
   }
 
-  def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = {
+  def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
 
-    new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+    new TestingCluster(config, singleActorSystem = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 8c211ef..ffdca36 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -48,6 +48,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 		<!-- Needed for the streaming wordcount example -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 0243012..31a3d98 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -48,7 +48,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/tools/maven/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml
index f7bb0d4..0f7f6bb 100644
--- a/tools/maven/scalastyle-config.xml
+++ b/tools/maven/scalastyle-config.xml
@@ -86,7 +86,7 @@
  <!-- </check> -->
  <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
   <parameters>
-   <parameter name="maxParameters"><![CDATA[10]]></parameter>
+   <parameter name="maxParameters"><![CDATA[15]]></parameter>
   </parameters>
  </check>
  <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->