You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:13 UTC
[2/9] flink git commit: [streaming] [storm] Clean up instantiation of
mini clusters and test environments.
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
index a02b536..131937e 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -73,7 +73,7 @@ public class RemoteTezEnvironment extends ExecutionEnvironment {
public RemoteTezEnvironment() {
compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
- executor = new TezExecutor(compiler, this.getDegreeOfParallelism());
+ executor = new TezExecutor(compiler, getParallelism());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
index 70c5492..e3c6f1b 100644
--- a/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ b/flink-staging/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.tez.test;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.tez.client.LocalTezEnvironment;
import org.junit.Assert;
@@ -40,7 +41,7 @@ public abstract class TezProgramTestBase extends AbstractTestBase {
}
public TezProgramTestBase(Configuration config) {
- super (config);
+ super (config, StreamingMode.BATCH_ONLY);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 1e85c71..005382a 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -28,28 +28,36 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import org.apache.flink.runtime.akka.AkkaUtils;
-
+/**
+ * A base class for tests that run test programs in a Flink mini cluster.
+ */
public abstract class AbstractTestBase extends TestBaseUtils {
-
-
+
+ /** Configuration to start the testing cluster with */
protected final Configuration config;
-
+
private final List<File> tempFiles;
-
- private final FiniteDuration timeout;
- protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ private final FiniteDuration timeout;
- protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+ /** Mode (batch-only / streaming) in which to start the system */
+ private final StreamingMode streamingMode;
+
+ protected int taskManagerNumSlots = 1;
+
+ protected int numTaskManagers = 1;
+ /** The mini cluster that runs the test programs */
protected ForkableFlinkMiniCluster executor;
- public AbstractTestBase(Configuration config) {
- this.config = config;
+ public AbstractTestBase(Configuration config, StreamingMode streamingMode) {
+ this.config = Objects.requireNonNull(config);
+ this.streamingMode = Objects.requireNonNull(streamingMode);
this.tempFiles = new ArrayList<File>();
timeout = AkkaUtils.getTimeout(config);
@@ -59,11 +67,11 @@ public abstract class AbstractTestBase extends TestBaseUtils {
// Local Test Cluster Life Cycle
// --------------------------------------------------------------------------------------------
- public void startCluster() throws Exception{
+ public void startCluster() throws Exception {
this.executor = startCluster(
numTaskManagers,
taskManagerNumSlots,
- StreamingMode.BATCH_ONLY,
+ streamingMode,
false,
false,
true);
@@ -71,7 +79,6 @@ public abstract class AbstractTestBase extends TestBaseUtils {
public void stopCluster() throws Exception {
stopCluster(executor, timeout);
-
deleteAllTempFiles();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index e639c80..f2de650 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.java.tuple.Tuple;
@@ -50,7 +51,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
public JavaProgramTestBase(Configuration config) {
- super(config);
+ super(config, StreamingMode.BATCH_ONLY);
setTaskManagerNumSlots(parallelism);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index bd5400d..70eeffd 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.junit.Assert;
@@ -46,7 +47,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
}
public RecordAPITestBase(Configuration config) {
- super(config);
+ super(config, StreamingMode.BATCH_ONLY);
setTaskManagerNumSlots(parallelism);
}
@@ -67,10 +68,11 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
protected JobGraph getJobGraph() throws Exception {
Plan p = getTestJob();
- p.setExecutionConfig(new ExecutionConfig());
if (p == null) {
- Assert.fail("Error: Cannot obtain Pact plan. Did the test forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
+ Assert.fail("Error: Cannot obtain plan. Did the test forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
}
+
+ p.setExecutionConfig(new ExecutionConfig());
Optimizer pc = new Optimizer(new DataStatistics(), this.config);
OptimizedPlan op = pc.compile(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a2c7b93..49cc68b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -78,10 +78,6 @@ public class TestBaseUtils extends TestLogger {
protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
- protected static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
-
- protected static final int DEFAULT_NUM_TASK_MANAGERS = 1;
-
protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
@@ -118,7 +114,7 @@ public class TestBaseUtils extends TestLogger {
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
- if(startZooKeeper) {
+ if (startZooKeeper) {
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
}
@@ -159,7 +155,8 @@ public class TestBaseUtils extends TestLogger {
if (executor != null) {
int numUnreleasedBCVars = 0;
int numActiveConnections = 0;
- {
+
+ if (executor.running()) {
List<ActorRef> tms = executor.getTaskManagersAsJava();
List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<Future<Object>>();
List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<Future<Object>>();
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
index f4b3875..a185135 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.test.javaApiOperators;
import org.apache.flink.api.common.ExecutionConfig;
@@ -22,44 +23,26 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import static org.junit.Assert.assertEquals;
-
/**
* Test ExecutionEnvironment from user perspective
*/
@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
+public class ExecutionEnvironmentITCase {
+
private static final int PARALLELISM = 5;
- public ExecutionEnvironmentITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Parameterized.Parameters(name = "Execution mode = {0}")
- public static Collection<TestExecutionMode[]> executionModes(){
- Collection<TestExecutionMode[]> c = new ArrayList<TestExecutionMode[]>(1);
- c.add(new TestExecutionMode[] {TestExecutionMode.CLUSTER});
- return c;
- }
-
-
/**
* Ensure that the user can pass a custom configuration object to the LocalEnvironment
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index af51ed6..e976c23 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -58,7 +58,7 @@ public class IPv6HostnamesITCase extends TestLogger {
final Inet6Address ipv6address = getLocalIPv6Address();
if (ipv6address == null) {
- System.err.println("--- Cannot find a non-loopback local IPv6 address, skipping IPv6HostnamesITCase");
+ System.err.println("--- Cannot find a non-loopback local IPv6 address that Akka/Netty can bind to; skipping IPv6HostnamesITCase");
return;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
index a5f1cbb..d50186e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
@@ -53,7 +53,8 @@ class BatchScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.api.java.operators.TwoInputOperator.getInput2",
"org.apache.flink.api.java.operators.TwoInputOperator.getInput1Type",
"org.apache.flink.api.java.operators.TwoInputOperator.getInput2Type",
- "org.apache.flink.api.java.ExecutionEnvironment.localExecutionIsAllowed",
+ "org.apache.flink.api.java.ExecutionEnvironment.areExplicitEnvironmentsAllowed",
+ "org.apache.flink.api.java.ExecutionEnvironment.resetContextEnvironment",
"org.apache.flink.api.java.ExecutionEnvironment.setDefaultLocalParallelism",
// TypeHints are only needed for Java API, Scala API doesn't need them