You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:13 UTC

[2/9] flink git commit: [streaming] [storm] Clean up instantiation of mini clusters and test environments.

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
index a02b536..131937e 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -73,7 +73,7 @@ public class RemoteTezEnvironment extends ExecutionEnvironment {
 
 	public RemoteTezEnvironment() {
 		compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
-		executor = new TezExecutor(compiler, this.getDegreeOfParallelism());
+		executor = new TezExecutor(compiler, getParallelism());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
index 70c5492..e3c6f1b 100644
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.tez.test;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.tez.client.LocalTezEnvironment;
 import org.junit.Assert;
@@ -40,7 +41,7 @@ public abstract class TezProgramTestBase extends AbstractTestBase {
     }
 
     public TezProgramTestBase(Configuration config) {
-        super (config);
+        super (config, StreamingMode.BATCH_ONLY);
     }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 1e85c71..005382a 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -28,28 +28,36 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 
-
+/**
+ * A base class for tests that run test programs in a Flink mini cluster.
+ */
 public abstract class AbstractTestBase extends TestBaseUtils {
-
-
+	
+	/** Configuration to start the testing cluster with */
 	protected final Configuration config;
-
+	
 	private final List<File> tempFiles;
-
-	private final FiniteDuration timeout;
 	
-	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+	private final FiniteDuration timeout;
 
-	protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+	/** Mode (batch-only / streaming) in which to start the system */
+	private final StreamingMode streamingMode;
+
+	protected int taskManagerNumSlots = 1;
+
+	protected int numTaskManagers = 1;
 	
+	/** The mini cluster that runs the test programs */
 	protected ForkableFlinkMiniCluster executor;
 	
 
-	public AbstractTestBase(Configuration config) {
-		this.config = config;
+	public AbstractTestBase(Configuration config, StreamingMode streamingMode) {
+		this.config = Objects.requireNonNull(config);
+		this.streamingMode = Objects.requireNonNull(streamingMode);
 		this.tempFiles = new ArrayList<File>();
 
 		timeout = AkkaUtils.getTimeout(config);
@@ -59,11 +67,11 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	//  Local Test Cluster Life Cycle
 	// --------------------------------------------------------------------------------------------
 
-	public void startCluster() throws Exception{
+	public void startCluster() throws Exception {
 		this.executor = startCluster(
 			numTaskManagers,
 			taskManagerNumSlots,
-			StreamingMode.BATCH_ONLY,
+			streamingMode,
 			false,
 			false,
 			true);
@@ -71,7 +79,6 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 
 	public void stopCluster() throws Exception {
 		stopCluster(executor, timeout);
-
 		deleteAllTempFiles();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index e639c80..f2de650 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -50,7 +51,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	}
 	
 	public JavaProgramTestBase(Configuration config) {
-		super(config);
+		super(config, StreamingMode.BATCH_ONLY);
 		setTaskManagerNumSlots(parallelism);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index bd5400d..70eeffd 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import org.junit.Assert;
@@ -46,7 +47,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 	}
 	
 	public RecordAPITestBase(Configuration config) {
-		super(config);
+		super(config, StreamingMode.BATCH_ONLY);
 		setTaskManagerNumSlots(parallelism);
 	}
 	
@@ -67,10 +68,11 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 	
 	protected JobGraph getJobGraph() throws Exception {
 		Plan p = getTestJob();
-		p.setExecutionConfig(new ExecutionConfig());
 		if (p == null) {
-			Assert.fail("Error: Cannot obtain Pact plan. Did the test forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
+			Assert.fail("Error: Cannot obtain plan. Did the test forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
 		}
+
+		p.setExecutionConfig(new ExecutionConfig());
 		
 		Optimizer pc = new Optimizer(new DataStatistics(), this.config);
 		OptimizedPlan op = pc.compile(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a2c7b93..49cc68b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -78,10 +78,6 @@ public class TestBaseUtils extends TestLogger {
 
 	protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
 
-	protected static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
-
-	protected static final int DEFAULT_NUM_TASK_MANAGERS = 1;
-
 	protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
 
 	protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
@@ -118,7 +114,7 @@ public class TestBaseUtils extends TestLogger {
 		
 		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
 
-		if(startZooKeeper) {
+		if (startZooKeeper) {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
 			config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
 		}
@@ -159,7 +155,8 @@ public class TestBaseUtils extends TestLogger {
 		if (executor != null) {
 			int numUnreleasedBCVars = 0;
 			int numActiveConnections = 0;
-			{
+			
+			if (executor.running()) {
 				List<ActorRef> tms = executor.getTaskManagersAsJava();
 				List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<Future<Object>>();
 				List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<Future<Object>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
index f4b3875..a185135 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.javaApiOperators;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -22,44 +23,26 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
-
 /**
  * Test ExecutionEnvironment from user perspective
  */
 @SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
+public class ExecutionEnvironmentITCase {
+	
 	private static final int PARALLELISM = 5;
 
-	public ExecutionEnvironmentITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Parameterized.Parameters(name = "Execution mode = {0}")
-	public static Collection<TestExecutionMode[]> executionModes(){
-		Collection<TestExecutionMode[]> c = new ArrayList<TestExecutionMode[]>(1);
-		c.add(new TestExecutionMode[] {TestExecutionMode.CLUSTER});
-		return c;
-	}
-
-
 	/**
 	 * Ensure that the user can pass a custom configuration object to the LocalEnvironment
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index af51ed6..e976c23 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -58,7 +58,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 
 		final Inet6Address ipv6address = getLocalIPv6Address();
 		if (ipv6address == null) {
-			System.err.println("--- Cannot find a non-loopback local IPv6 address, skipping IPv6HostnamesITCase");
+			System.err.println("--- Cannot find a non-loopback local IPv6 address that Akka/Netty can bind to; skipping IPv6HostnamesITCase");
 			return;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
index a5f1cbb..d50186e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
@@ -53,7 +53,8 @@ class BatchScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
        "org.apache.flink.api.java.operators.TwoInputOperator.getInput2",
        "org.apache.flink.api.java.operators.TwoInputOperator.getInput1Type",
        "org.apache.flink.api.java.operators.TwoInputOperator.getInput2Type",
-       "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed",
+       "org.apache.flink.api.java.ExecutionEnvironment.areExplicitEnvironmentsAllowed",
+       "org.apache.flink.api.java.ExecutionEnvironment.resetContextEnvironment",
        "org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism",
 
        // TypeHints are only needed for Java API, Scala API doesn't need them