You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:14 UTC
[3/9] flink git commit: [streaming] [storm] Clean up instantiation of
mini clusters and test environments.
[streaming] [storm] Clean up instantiation of mini clusters and test environments.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82d62361
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82d62361
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82d62361
Branch: refs/heads/master
Commit: 82d6236173093b7e035a21360c7b69c67fd6ae62
Parents: 891db5e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 30 00:12:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 11:02:39 2015 +0200
----------------------------------------------------------------------
.../OperatorStatsAccumulatorTest.java | 11 +-
.../api/FlinkLocalCluster.java | 88 +++++--
.../excamation/StormExclamationLocal.java | 9 +-
.../api/FlinkTestCluster.java | 107 --------
.../stormcompatibility/api/StormTestBase.java | 117 +++++++++
.../ExclamationWithStormBoltITCase.java | 4 +-
.../ExclamationWithStormSpoutITCase.java | 5 +-
.../StormExclamationLocalITCase.java | 7 +-
.../wordcount/BoltTokenizerWordCountITCase.java | 4 +-
.../BoltTokenizerWordCountPojoITCase.java | 4 +-
.../BoltTokenizerWordCountWithNamesITCase.java | 4 +-
.../wordcount/SpoutSourceWordCountITCase.java | 4 +-
.../wordcount/StormWordCountLocalITCase.java | 7 +-
.../StormWordCountLocalNamedITCase.java | 7 +-
.../flink/api/java/ExecutionEnvironment.java | 2 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 7 +
.../java/org/apache/flink/tachyon/HDFSTest.java | 26 +-
.../api/java/ScalaShellRemoteEnvironment.java | 10 +-
.../org.apache.flink/api/scala/FlinkILoop.scala | 9 +-
.../api/environment/LocalStreamEnvironment.java | 78 ++++--
.../environment/RemoteStreamEnvironment.java | 52 +++-
.../environment/StreamExecutionEnvironment.java | 249 +++++++++++--------
.../flink/streaming/util/ClusterUtil.java | 96 -------
.../util/StreamingMultipleProgramsTestBase.java | 29 +--
.../util/StreamingProgramTestBase.java | 58 ++---
.../streaming/util/TestStreamEnvironment.java | 150 +++--------
.../TopSpeedWindowingExampleITCase.java | 2 +-
.../flink/tez/client/RemoteTezEnvironment.java | 2 +-
.../flink/tez/test/TezProgramTestBase.java | 3 +-
.../flink/test/util/AbstractTestBase.java | 33 ++-
.../flink/test/util/JavaProgramTestBase.java | 3 +-
.../flink/test/util/RecordAPITestBase.java | 8 +-
.../apache/flink/test/util/TestBaseUtils.java | 9 +-
.../ExecutionEnvironmentITCase.java | 25 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 2 +-
.../BatchScalaAPICompletenessTest.scala | 3 +-
36 files changed, 620 insertions(+), 614 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
index b9a8dc2..887c745 100644
--- a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
+++ b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
@@ -36,6 +37,7 @@ import java.io.Serializable;
import java.util.Map;
import java.util.Random;
+@SuppressWarnings("serial")
public class OperatorStatsAccumulatorTest extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OperatorStatsAccumulatorTest.class);
@@ -43,9 +45,9 @@ public class OperatorStatsAccumulatorTest extends AbstractTestBase {
private static final String ACCUMULATOR_NAME = "op-stats";
public OperatorStatsAccumulatorTest(){
- super(new Configuration());
+ super(new Configuration(), StreamingMode.BATCH_ONLY);
}
-
+
public static class StringToInt extends RichFlatMapFunction<String, Tuple1<Integer>> {
// Is instantiated later since the runtime context is not yet initialized
@@ -81,9 +83,8 @@ public class OperatorStatsAccumulatorTest extends AbstractTestBase {
try {
intValue = Integer.parseInt(value);
localAccumulator.add(intValue);
- out.collect(new Tuple1(intValue));
- } catch (NumberFormatException ex) {
- }
+ out.collect(new Tuple1<>(intValue));
+ } catch (NumberFormatException ignored) {}
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
index b5eda8b..c139201 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -25,15 +25,40 @@ import backtype.storm.generated.RebalanceOptions;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
-import org.apache.flink.streaming.util.ClusterUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.Objects;
/**
* {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
*/
public class FlinkLocalCluster {
+ /** The log used by this mini cluster */
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
+
+ /** The flink mini cluster on which to execute the programs */
+ private final FlinkMiniCluster flink;
+
+
+ public FlinkLocalCluster() {
+ this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
+ this.flink.start();
+ }
+
+ public FlinkLocalCluster(FlinkMiniCluster flink) {
+ this.flink = Objects.requireNonNull(flink);
+ }
+
public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
throws Exception {
this.submitTopologyWithOpts(topologyName, conf, topology, null);
@@ -41,7 +66,10 @@ public class FlinkLocalCluster {
public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
- ClusterUtil.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
+
+ LOG.info("Running Storm topology on FlinkLocalCluster");
+ JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
+ flink.submitJobDetached(jobGraph);
}
public void killTopology(final String topologyName) {
@@ -60,7 +88,9 @@ public class FlinkLocalCluster {
public void rebalance(final String name, final RebalanceOptions options) {
}
- public void shutdown() {}
+ public void shutdown() {
+ flink.stop();
+ }
public String getTopologyConf(final String id) {
return null;
@@ -82,31 +112,57 @@ public class FlinkLocalCluster {
return null;
}
+ // ------------------------------------------------------------------------
+ // Access to default local cluster
+ // ------------------------------------------------------------------------
+
// A different {@link FlinkLocalCluster} to be used for execution of ITCases
- private static FlinkLocalCluster currentCluster = null;
+ private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
/**
- * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by {@link
- * #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
+ * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
+ * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
*
* @return a {@link FlinkLocalCluster} to be used for execution
*/
public static FlinkLocalCluster getLocalCluster() {
- if (currentCluster == null) {
- currentCluster = new FlinkLocalCluster();
- }
-
- return currentCluster;
+ return currentFactory.createLocalCluster();
}
/**
- * Sets a different {@link FlinkLocalCluster} to be used for execution.
+ * Sets a different factory for FlinkLocalClusters to be used for execution.
*
- * @param cluster
- * the {@link FlinkLocalCluster} to be used for execution
+ * @param clusterFactory
+ * The LocalClusterFactory to create the local clusters for execution.
*/
- public static void initialize(final FlinkLocalCluster cluster) {
- currentCluster = cluster;
+ public static void initialize(LocalClusterFactory clusterFactory) {
+ currentFactory = Objects.requireNonNull(clusterFactory);
}
+
+ // ------------------------------------------------------------------------
+ // Cluster factory
+ // ------------------------------------------------------------------------
+ /**
+ * A factory that creates local clusters.
+ */
+ public static interface LocalClusterFactory {
+
+ /**
+ * Creates a local flink cluster.
+ * @return A local flink cluster.
+ */
+ FlinkLocalCluster createLocalCluster();
+ }
+
+ /**
+ * A factory that instantiates a FlinkLocalCluster.
+ */
+ public static class DefaultLocalClusterFactory implements LocalClusterFactory {
+
+ @Override
+ public FlinkLocalCluster createLocalCluster() {
+ return new FlinkLocalCluster();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
index 5941ff0..bd1220c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
@@ -23,18 +23,19 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
/**
* Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
- * submitted to Flink for execution in the same way as to a Storm {@link LocalCluster}.
+ * files in a streaming fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}
+ * and submitted to Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}.
* <p/>
* This example shows how to run program directly within Java, thus it cannot be used to submit a
- * {@link StormTopology} via Flink command line clients (ie, bin/flink).
+ * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
* <p/>
* <p/>
* The input is a plain text file with lines separated by newline characters.
* <p/>
* <p/>
* Usage: <code>StormExclamationLocal <text path> <result path></code><br/>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
* <p/>
* <p/>
* This example shows how to:
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
deleted file mode 100644
index 68f1216..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-
-import java.util.Map;
-
-/**
- * {@link FlinkTestCluster} mimics a Storm {@link LocalCluster} for ITCases via a {@link TestStreamEnvironment}.
- */
-public class FlinkTestCluster extends FlinkLocalCluster {
-
- @Override
- public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
- throws Exception {
- this.submitTopologyWithOpts(topologyName, conf, topology, null);
- }
-
- @Override
- public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
- final SubmitOptions submitOpts)
- throws Exception {
- final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
- env.start(topology.getStreamGraph().getJobGraph(topologyName));
- }
-
- @Override
- public void killTopology(final String topologyName) {
- }
-
- @Override
- public void killTopologyWithOpts(final String name, final KillOptions options) {
- }
-
- @Override
- public void activate(final String topologyName) {
- }
-
- @Override
- public void deactivate(final String topologyName) {
- }
-
- @Override
- public void rebalance(final String name, final RebalanceOptions options) {
- }
-
- @Override
- public void shutdown() {
- final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
- try {
- env.shutdown();
- } catch (final InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public String getTopologyConf(final String id) {
- return null;
- }
-
- @Override
- public StormTopology getTopology(final String id) {
- return null;
- }
-
- @Override
- public ClusterSummary getClusterInfo() {
- return null;
- }
-
- @Override
- public TopologyInfo getTopologyInfo(final String id) {
- return null;
- }
-
- @Override
- public Map<?, ?> getState() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
new file mode 100644
index 0000000..dd6d0d9
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for Storm tests.
+ */
+public abstract class StormTestBase extends AbstractTestBase {
+
+ public static final int DEFAULT_PARALLELISM = 4;
+
+ public StormTestBase() {
+ this(new Configuration());
+ }
+
+ public StormTestBase(Configuration config) {
+ super(config, StreamingMode.STREAMING);
+ setTaskManagerNumSlots(DEFAULT_PARALLELISM);
+ }
+
+ // ------------------------------------------------------------------------
+ // Methods to create the test program and for pre- and post- test work
+ // ------------------------------------------------------------------------
+
+ protected abstract void testProgram() throws Exception;
+
+ protected void preSubmit() throws Exception {}
+
+ protected void postSubmit() throws Exception {}
+
+ // ------------------------------------------------------------------------
+ // Test entry point
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testJob() throws Exception {
+ try {
+ // pre-submit
+ try {
+ preSubmit();
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Pre-submit work caused an error: " + e.getMessage());
+ }
+
+ // prepare the test environment
+ startCluster();
+
+ // we need to initialize the stream test environment, and the storm local cluster
+ TestStreamEnvironment.setAsContext(this.executor, DEFAULT_PARALLELISM);
+
+ FlinkLocalCluster.initialize(new FlinkLocalCluster.LocalClusterFactory() {
+ @Override
+ public FlinkLocalCluster createLocalCluster() {
+ return new FlinkLocalCluster(executor);
+ }
+ });
+
+ // call the test program
+ try {
+ testProgram();
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Error while calling the test program: " + e.getMessage());
+ }
+
+ // post-submit
+ try {
+ postSubmit();
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Post-submit work caused an error: " + e.getMessage());
+ }
+ }
+ finally {
+ // reset the FlinkLocalCluster to its default behavior
+ FlinkLocalCluster.initialize(new FlinkLocalCluster.DefaultLocalClusterFactory());
+
+ // reset the StreamExecutionEnvironment to its default behavior
+ TestStreamEnvironment.unsetAsContext();
+
+ // clean up all resources
+ stopCluster();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
index 930f87b..f47a58f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
@@ -18,12 +18,12 @@
package org.apache.flink.stormcompatibility.exclamation;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormBolt;
import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class ExclamationWithStormBoltITCase extends StreamingProgramTestBase {
+public class ExclamationWithStormBoltITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
index 4c515ce..2a8ac24 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
@@ -18,12 +18,12 @@
package org.apache.flink.stormcompatibility.exclamation;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormSpout;
import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class ExclamationWithStormSpoutITCase extends StreamingProgramTestBase {
+public class ExclamationWithStormSpoutITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
@@ -43,5 +43,4 @@ public class ExclamationWithStormSpoutITCase extends StreamingProgramTestBase {
protected void testProgram() throws Exception {
ExclamationWithStormSpout.main(new String[]{this.textPath, this.resultPath});
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
index d6bcf30..6cba39a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -18,21 +18,18 @@
package org.apache.flink.stormcompatibility.exclamation;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.stormcompatibility.excamation.StormExclamationLocal;
import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class StormExclamationLocalITCase extends StreamingProgramTestBase {
+public class StormExclamationLocalITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
- FlinkLocalCluster.initialize(new FlinkTestCluster());
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
index 9228474..c9516ff 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
@@ -18,10 +18,10 @@
package org.apache.flink.stormcompatibility.wordcount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
+public class BoltTokenizerWordCountITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
index dc75c25..351014e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
@@ -18,10 +18,10 @@
package org.apache.flink.stormcompatibility.wordcount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase {
+public class BoltTokenizerWordCountPojoITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
index e147f53..c2ed088 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
@@ -18,10 +18,10 @@
package org.apache.flink.stormcompatibility.wordcount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase {
+public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
index 9d7b869..93361c5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
@@ -18,10 +18,10 @@
package org.apache.flink.stormcompatibility.wordcount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
+public class SpoutSourceWordCountITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
index 2427818..6b51cbd 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
@@ -18,19 +18,16 @@
package org.apache.flink.stormcompatibility.wordcount;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class StormWordCountLocalITCase extends StreamingProgramTestBase {
+public class StormWordCountLocalITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
- FlinkLocalCluster.initialize(new FlinkTestCluster());
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
index 8b9a729..a9e9884 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
@@ -18,19 +18,16 @@
package org.apache.flink.stormcompatibility.wordcount;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
import org.apache.flink.test.testdata.WordCountData;
-public class StormWordCountLocalNamedITCase extends StreamingProgramTestBase {
+public class StormWordCountLocalNamedITCase extends StormTestBase {
protected String textPath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
- FlinkLocalCluster.initialize(new FlinkTestCluster());
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 084e608..c69294d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -75,7 +75,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
/**
- * The ExecutionEnviroment is the context in which a program is executed. A
+ * The ExecutionEnvironment is the context in which a program is executed. A
* {@link LocalEnvironment} will cause execution in the current JVM, a
* {@link RemoteEnvironment} will cause execution on a remote setup.
* <p>
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index b3cff51..77e977f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -112,6 +112,8 @@ abstract class FlinkMiniCluster(
var taskManagerActors: Option[Seq[ActorRef]] = None
protected var leaderRetrievalService: Option[LeaderRetrievalService] = None
+
+ private var isRunning = false
// --------------------------------------------------------------------------
// Abstract Methods
@@ -271,6 +273,8 @@ abstract class FlinkMiniCluster(
if(waitForTaskManagerRegistration) {
waitForTaskManagersToBeRegistered()
}
+
+ isRunning = true
}
def startWebServer(
@@ -314,6 +318,7 @@ abstract class FlinkMiniCluster(
awaitTermination()
leaderRetrievalService.foreach(_.stop())
+ isRunning = false
}
protected def shutdown(): Unit = {
@@ -354,6 +359,8 @@ abstract class FlinkMiniCluster(
_ foreach(_.awaitTermination())
}
}
+
+ def running = isRunning
// --------------------------------------------------------------------------
// Utility Methods
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index 633d022..5ec0add 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -99,14 +99,21 @@ public class HDFSTest {
try {
FileSystem fs = file.getFileSystem();
Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
- new DopOneTestEnvironment();
+
+ DopOneTestEnvironment.setAsContext();
try {
WordCount.main(new String[]{file.toString(), result.toString()});
- } catch(Throwable t) {
+ }
+ catch(Throwable t) {
t.printStackTrace();
Assert.fail("Test failed with " + t.getMessage());
}
+ finally {
+ DopOneTestEnvironment.unsetAsContext();
+ }
+
Assert.assertTrue("No result file present", hdfs.exists(result));
+
// validate output:
org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
StringWriter writer = new StringWriter();
@@ -159,16 +166,23 @@ public class HDFSTest {
}
// package visible
- static final class DopOneTestEnvironment extends LocalEnvironment {
- static {
+ static abstract class DopOneTestEnvironment extends ExecutionEnvironment {
+
+ public static void setAsContext() {
+ final LocalEnvironment le = new LocalEnvironment();
+ le.setParallelism(1);
+
initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+
@Override
public ExecutionEnvironment createExecutionEnvironment() {
- LocalEnvironment le = new LocalEnvironment();
- le.setParallelism(1);
return le;
}
});
}
+
+ public static void unsetAsContext() {
+ resetContextEnvironment();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 859c686..cb37fd4 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -85,7 +85,11 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
return executor.executePlan(p);
}
- public void setAsContext() {
+ public static void disableAllContextAndOtherEnvironments() {
+
+ // we create a context environment that prevents the instantiation of further
+ // context environments. at the same time, setting the context environment prevents manual
+ // creation of local and remote environments
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
@@ -95,4 +99,8 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
};
initializeContextEnvironment(factory);
}
+
+ public static void resetContextEnvironments() {
+ ExecutionEnvironment.resetContextEnvironment();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
index 1e96ba3..cd8a846 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -53,8 +53,15 @@ class FlinkILoop(
}
// remote environment
private val remoteEnv: ScalaShellRemoteEnvironment = {
+ // allow creation of environments
+ ScalaShellRemoteEnvironment.resetContextEnvironments()
+
+ // create our environment that submits against the cluster (local or remote)
val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
- remoteEnv.setAsContext()
+
+ // prevent further instantiation of environments
+ ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
+
remoteEnv
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 4c002d1..f0bd174 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -17,30 +17,57 @@
package org.apache.flink.streaming.api.environment;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.ClusterUtil;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
+ * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
+ * Flink cluster in the background and executes the program on that cluster.
+ *
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
+ * parallelism can be set via {@link #setParallelism(int)}.
+ *
+ * <p>Local environments can also be instantiated through {@link StreamExecutionEnvironment#createLocalEnvironment()}
+ * and {@link StreamExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
+ * default parallelism equal to the number of hardware contexts in the local machine.
+ */
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
+
+ /** The configuration to use for the local cluster */
private final Configuration conf;
+ /**
+ * Creates a new local stream environment that uses the default configuration.
+ */
public LocalStreamEnvironment() {
- conf = null;
- }
- public LocalStreamEnvironment(Configuration conf) {
- this.conf = conf;
+ this(null);
}
/**
- * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
- * default name.
+ * Creates a new local stream environment that configures its local executor with the given configuration.
*
- * @return The result of the job execution, containing elapsed time and accumulators.
+ * @param config The configuration used to configure the local executor.
*/
- @Override
- public JobExecutionResult execute() throws Exception {
- return execute(DEFAULT_JOB_NAME);
+ public LocalStreamEnvironment(Configuration config) {
+ if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+ throw new InvalidProgramException(
+ "The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
+ "or running in a TestEnvironment context.");
+ }
+
+ this.conf = config == null ? new Configuration() : config;
}
/**
@@ -53,9 +80,30 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- JobExecutionResult result = ClusterUtil.runOnMiniCluster(getStreamGraph().getJobGraph(),
- getParallelism(), -1, getConfig().isSysoutLoggingEnabled(), false, this.conf);
- transformations.clear();
- return result;
+ // transform the streaming program into a JobGraph
+ JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
+
+ Configuration configuration = new Configuration();
+ configuration.addAll(jobGraph.getJobConfiguration());
+
+ configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
+
+ // add (and override) the settings with what the user defined
+ configuration.addAll(this.conf);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Running job on local embedded Flink mini cluster");
+ }
+
+ LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING);
+ try {
+ exec.start();
+ return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
+ }
+ finally {
+ transformations.clear();
+ exec.stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 29439f6..ccf51d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -41,6 +43,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private final String host;
private final int port;
private final List<File> jarFiles;
+
+ /** The configuration used to parametrize the client that connects to the remote cluster */
+ private final Configuration config;
/**
* Creates a new RemoteStreamEnvironment that points to the master
@@ -59,17 +64,46 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
* provided in the JAR files.
*/
public RemoteStreamEnvironment(String host, int port, String... jarFiles) {
+ this(host, port, null, jarFiles);
+ }
+
+ /**
+ * Creates a new RemoteStreamEnvironment that points to the master
+ * (JobManager) described by the given host name and port.
+ *
+ * @param host
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
+ * @param port
+ * The port of the master (JobManager), where the program should
+ * be executed.
+ * @param config
+ * The configuration used to parametrize the client that connects to the
+ * remote cluster.
+ * @param jarFiles
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ */
+ public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles) {
+ if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+ throw new InvalidProgramException(
+ "The RemoteEnvironment cannot be used when submitting a program through a client, " +
+ "or running in a TestEnvironment context.");
+ }
+
if (host == null) {
throw new NullPointerException("Host must not be null.");
}
-
if (port < 1 || port >= 0xffff) {
throw new IllegalArgumentException("Port out of range");
}
this.host = host;
this.port = port;
- this.jarFiles = new ArrayList<File>();
+ this.config = config == null ? new Configuration() : config;
+ this.jarFiles = new ArrayList<File>(jarFiles.length);
for (String jarFile : jarFiles) {
File file = new File(jarFile);
try {
@@ -83,13 +117,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
}
@Override
- public JobExecutionResult execute() throws ProgramInvocationException {
- JobGraph jobGraph = getStreamGraph().getJobGraph();
- transformations.clear();
- return executeRemotely(jobGraph);
- }
-
- @Override
public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
transformations.clear();
@@ -112,9 +139,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
jobGraph.addJar(new Path(file.getAbsolutePath()));
}
- Configuration configuration = jobGraph.getJobConfiguration();
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
-
+
+ Configuration configuration = new Configuration();
+ configuration.addAll(jobGraph.getJobConfiguration());
+ configuration.addAll(this.config);
+
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2a31390..5537fd4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -80,26 +80,46 @@ import java.util.List;
import java.util.Objects;
/**
- * {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is
+ * An ExecutionEnvironment for streaming jobs. An instance of it is
* necessary to construct streaming topologies.
*/
+/**
+ * The StreamExecutionEnvironment is the context in which a streaming program is executed. A
+ * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
+ * {@link RemoteStreamEnvironment} will cause execution on a remote setup.
+ *
+ * <p>The environment provides methods to control the job execution (such as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
+ *
+ * @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment
+ * @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment
+ */
public abstract class StreamExecutionEnvironment {
+ /** The default name to use for a streaming job if no other name has been specified */
public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
-
- private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
/** The time characteristic that is used if none other is set */
- private static TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
+ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
+
+ /** The default buffer timeout (max delay of records in the network stack) */
+ private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
+
+ /** The environment of the context (local by default, cluster if invoked through command line) */
+ private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
+
+ /** The default parallelism used when creating a local environment */
+ private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
// ------------------------------------------------------------------------
-
- private long bufferTimeout = 100;
+ /** The execution configuration for this environment */
private final ExecutionConfig config = new ExecutionConfig();
-
+
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
-
+
+ private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
+
protected boolean isChainingEnabled = true;
protected long checkpointInterval = -1; // disabled
@@ -113,9 +133,7 @@ public abstract class StreamExecutionEnvironment {
/** The time characteristic used by the data streams */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
- /** The environment of the context (local by default, cluster if invoked through command line) */
- private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
-
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
@@ -1157,8 +1175,90 @@ public abstract class StreamExecutionEnvironment {
return new DataStreamSource<OUT>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program that have resulted in a "sink" operation. Sink operations are
+ * for example printing results or forwarding them to a message queue.
+ * <p/>
+ * The program execution will be logged and displayed with a generated
+ * default name.
+ *
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ public JobExecutionResult execute() throws Exception {
+ return execute(DEFAULT_JOB_NAME);
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program that have resulted in a "sink" operation. Sink operations are
+ * for example printing results or forwarding them to a message queue.
+ * <p/>
+ * The program execution will be logged and displayed with the provided name
+ *
+ * @param jobName
+ * Desired name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ public abstract JobExecutionResult execute(String jobName) throws Exception;
+
+ /**
+ * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+ *
+ * @return The streamgraph representing the transformations
+ */
+ public StreamGraph getStreamGraph() {
+ if (transformations.size() <= 0) {
+ throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
+ }
+ return StreamGraphGenerator.generate(this, transformations);
+ }
+
+ /**
+ * Creates the plan with which the system will execute the program, and
+ * returns it as a String using a JSON representation of the execution data
+ * flow graph. Note that this needs to be called, before the plan is
+ * executed.
+ *
+ * @return The execution plan of the program, as a JSON String.
+ */
+ public String getExecutionPlan() {
+ return getStreamGraph().getStreamingPlanAsJSON();
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+ */
+ public <F> F clean(F f) {
+ if (getConfig().isClosureCleanerEnabled()) {
+ ClosureCleaner.clean(f, true);
+ }
+ ClosureCleaner.ensureSerializable(f);
+ return f;
+ }
+
+ /**
+ * Adds an operator to the list of operators that should be executed when calling
+ * {@link #execute}.
+ *
+ * <p>
+ * When calling {@link #execute()} only the operators that where previously added to the list
+ * are executed.
+ *
+ * <p>
+ * This is not meant to be used by users. The API methods that create operators must call
+ * this method.
+ */
+ public void addOperator(StreamTransformation<?> transformation) {
+ Preconditions.checkNotNull(transformation, "transformation must not be null.");
+ this.transformations.add(transformation);
+ }
+
// --------------------------------------------------------------------------------------------
- // Instantiation of Execution Contexts
+ // Factory methods for ExecutionEnvironments
// --------------------------------------------------------------------------------------------
/**
@@ -1175,6 +1275,10 @@ public abstract class StreamExecutionEnvironment {
return contextEnvironmentFactory.createExecutionEnvironment();
}
+ // because the streaming project depends on "flink-clients" (and not the other way around)
+ // we currently need to intercept the data set environment and create a dependent stream env.
+ // this should be fixed once we rework the project dependencies
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
ContextEnvironment ctx = (ContextEnvironment) env;
@@ -1187,8 +1291,9 @@ public abstract class StreamExecutionEnvironment {
}
}
- private static StreamExecutionEnvironment createContextEnvironment(Client client,
- List<File> jars, int parallelism, boolean wait) {
+ private static StreamExecutionEnvironment createContextEnvironment(
+ Client client, List<File> jars, int parallelism, boolean wait)
+ {
return new StreamContextEnvironment(client, jars, parallelism, wait);
}
@@ -1236,11 +1341,9 @@ public abstract class StreamExecutionEnvironment {
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
- return (LocalStreamEnvironment) currentEnvironment;
+ return currentEnvironment;
}
- // TODO:fix cluster default parallelism
-
/**
* Creates a {@link RemoteStreamEnvironment}. The remote environment sends
* (parts of) the program to a cluster for execution. Note that all file
@@ -1261,8 +1364,8 @@ public abstract class StreamExecutionEnvironment {
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
- public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
- String... jarFiles) {
+ public static StreamExecutionEnvironment createRemoteEnvironment(
+ String host, int port, String... jarFiles) {
return new RemoteStreamEnvironment(host, port, jarFiles);
}
@@ -1287,93 +1390,41 @@ public abstract class StreamExecutionEnvironment {
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
- public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
- int parallelism, String... jarFiles) {
+ public static StreamExecutionEnvironment createRemoteEnvironment(
+ String host, int port, int parallelism, String... jarFiles)
+ {
RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
env.setParallelism(parallelism);
return env;
}
/**
- * Triggers the program execution. The environment will execute all parts of
- * the program that have resulted in a "sink" operation. Sink operations are
- * for example printing results or forwarding them to a message queue.
- * <p/>
- * The program execution will be logged and displayed with a generated
- * default name.
- *
- * @return The result of the job execution, containing elapsed time and accumulators.
- * @throws Exception which occurs during job execution.
- */
- public abstract JobExecutionResult execute() throws Exception;
-
- /**
- * Triggers the program execution. The environment will execute all parts of
- * the program that have resulted in a "sink" operation. Sink operations are
- * for example printing results or forwarding them to a message queue.
- * <p/>
- * The program execution will be logged and displayed with the provided name
- *
- * @param jobName
- * Desired name of the job
- * @return The result of the job execution, containing elapsed time and accumulators.
- * @throws Exception which occurs during job execution.
- */
- public abstract JobExecutionResult execute(String jobName) throws Exception;
-
- /**
- * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
- *
- * @return The streamgraph representing the transformations
- */
- public StreamGraph getStreamGraph() {
- if (transformations.size() <= 0) {
- throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
- }
- return StreamGraphGenerator.generate(this, transformations);
- }
-
- /**
- * Creates the plan with which the system will execute the program, and
- * returns it as a String using a JSON representation of the execution data
- * flow graph. Note that this needs to be called, before the plan is
- * executed.
- *
- * @return The execution plan of the program, as a JSON String.
- */
- public String getExecutionPlan() {
- return getStreamGraph().getStreamingPlanAsJSON();
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
- */
- public <F> F clean(F f) {
- if (getConfig().isClosureCleanerEnabled()) {
- ClosureCleaner.clean(f, true);
- }
- ClosureCleaner.ensureSerializable(f);
- return f;
- }
-
- /**
- * Adds an operator to the list of operators that should be executed when calling
- * {@link #execute}.
- *
- * <p>
- * When calling {@link #execute()} only the operators that where previously added to the list
- * are executed.
+ * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+ * (parts of) the program to a cluster for execution. Note that all file
+ * paths used in the program must be accessible from the cluster. The
+ * execution will use the specified parallelism.
*
- * <p>
- * This is not meant to be used by users. The API methods that create operators must call
- * this method.
+ * @param host
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
+ * @param port
+ * The port of the master (JobManager), where the program should
+ * be executed.
+ * @param clientConfig
+ * The configuration used by the client that connects to the remote cluster.
+ * @param jarFiles
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
*/
- public void addOperator(StreamTransformation<?> transformation) {
- Preconditions.checkNotNull(transformation, "Sinks must not be null.");
- this.transformations.add(transformation);
+ public static StreamExecutionEnvironment createRemoteEnvironment(
+ String host, int port, Configuration clientConfig, String... jarFiles)
+ {
+ return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
}
-
+
// --------------------------------------------------------------------------------------------
// Methods to control the context and local environments for execution from packaged programs
// --------------------------------------------------------------------------------------------
@@ -1381,4 +1432,8 @@ public abstract class StreamExecutionEnvironment {
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
}
+
+ protected static void resetContextEnvironment() {
+ contextEnvironmentFactory = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
deleted file mode 100644
index d4569fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class to manage mini cluster for Apache Flink.
- */
-public final class ClusterUtil {
-
- private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
-
- /**
- * Executes the given JobGraph locally, on a FlinkMiniCluster
- *
- * @param jobGraph
- * jobGraph
- * @param parallelism
- * numberOfTaskTrackers
- * @param memorySize
- * memorySize
- * @param customConf
- * Custom configuration for the LocalExecutor. Can be null.
- * @return The result of the job execution, containing elapsed time and accumulators.
- */
- public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize,
- boolean printDuringExecution, boolean detached, Configuration customConf)
- throws Exception {
-
- Configuration configuration = jobGraph.getJobConfiguration();
-
- LocalFlinkMiniCluster exec = null;
-
- configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
- if(customConf != null) {
- configuration.addAll(customConf);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("Running on mini cluster");
- }
-
- try {
- exec = new LocalFlinkMiniCluster(configuration, true);
- exec.start();
-
- if (detached) {
- exec.submitJobDetached(jobGraph);
- return null;
- } else {
- return exec.submitJobAndWait(jobGraph, printDuringExecution);
- }
- } finally {
- if (exec != null && !detached) {
- exec.stop();
- }
- }
- }
-
- /**
- * Start a job in a detached mode on a local mini cluster.
- */
- public static void startOnMiniCluster(JobGraph jobGraph, int parallelism) throws Exception {
- runOnMiniCluster(jobGraph, parallelism, -1, true, true, null);
- }
-
- /**
- * Private constructor to prevent instantiation.
- */
- private ClusterUtil() {
- throw new RuntimeException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index d251a5d..4e02f2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -35,23 +35,22 @@ import org.junit.BeforeClass;
* one or more regular test methods and retrieve the StreamExecutionEnvironment from
* the context:
*
- * <pre>{@code
- *
- * @Test
+ * <pre>
+ * {@literal @}Test
* public void someTest() {
* StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
* // test code
* env.execute();
* }
*
- * @Test
+ * {@literal @}Test
* public void anotherTest() {
* StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
* // test code
* env.execute();
* }
*
- * }</pre>
+ * </pre>
*/
public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
@@ -61,32 +60,22 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
protected static final int DEFAULT_PARALLELISM = 4;
- protected static ForkableFlinkMiniCluster cluster = null;
-
- // ------------------------------------------------------------------------
+ protected static ForkableFlinkMiniCluster cluster;
- public StreamingMultipleProgramsTestBase() {
- TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, DEFAULT_PARALLELISM);
- clusterEnv.setAsContext();
- }
// ------------------------------------------------------------------------
// Cluster setup & teardown
// ------------------------------------------------------------------------
@BeforeClass
- public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(
- 1,
- DEFAULT_PARALLELISM,
- StreamingMode.STREAMING,
- false,
- false,
- true);
+ public static void setup() throws Exception {
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false, false, true);
+ TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
}
@AfterClass
public static void teardown() throws Exception {
+ TestStreamEnvironment.unsetAsContext();
stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index 92f8301..ce3aa86 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -18,33 +18,27 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.Assert;
import org.junit.Test;
-public abstract class StreamingProgramTestBase extends AbstractTestBase {
-
- private static final int DEFAULT_PARALLELISM = 4;
-
- private TestStreamEnvironment env;
+import static org.junit.Assert.fail;
- private JobExecutionResult latestExecutionResult;
-
- private int parallelism = DEFAULT_PARALLELISM;
+public abstract class StreamingProgramTestBase extends AbstractTestBase {
+ protected static final int DEFAULT_PARALLELISM = 4;
+ private int parallelism;
+
+
public StreamingProgramTestBase() {
- this(new Configuration());
+ super(new Configuration(), StreamingMode.STREAMING);
+ setParallelism(DEFAULT_PARALLELISM);
}
- public StreamingProgramTestBase(Configuration config) {
- super(config);
- setTaskManagerNumSlots(parallelism);
- }
-
+
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
setTaskManagerNumSlots(parallelism);
@@ -54,10 +48,6 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
return parallelism;
}
- public JobExecutionResult getLatestExecutionResult() {
- return this.latestExecutionResult;
- }
-
// --------------------------------------------------------------------------------------------
// Methods to create the test program and for pre- and post- test work
@@ -74,8 +64,7 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
// --------------------------------------------------------------------------------------------
@Test
- public void testJobWithoutObjectReuse() throws Exception {
- startCluster();
+ public void testJob() throws Exception {
try {
// pre-submit
try {
@@ -84,25 +73,26 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
- Assert.fail("Pre-submit work caused an error: " + e.getMessage());
+ fail("Pre-submit work caused an error: " + e.getMessage());
}
// prepare the test environment
- env = new TestStreamEnvironment(this.executor, this.parallelism);
- env.setAsContext();
+ startCluster();
+
+ TestStreamEnvironment.setAsContext(this.executor, getParallelism());
// call the test program
try {
testProgram();
- this.latestExecutionResult = env.latestResult;
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
- Assert.fail("Error while calling the test program: " + e.getMessage());
+ fail("Error while calling the test program: " + e.getMessage());
+ }
+ finally {
+ TestStreamEnvironment.unsetAsContext();
}
-
- Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
// post-submit
try {
@@ -111,13 +101,11 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
- Assert.fail("Post-submit work caused an error: " + e.getMessage());
- }
- } finally {
- if(env.clusterRunsSynchronous()) {
- stopCluster();
+ fail("Post-submit work caused an error: " + e.getMessage());
}
}
+ finally {
+ stopCluster();
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 91082d8..8cd1e4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,150 +20,56 @@ package org.apache.flink.streaming.util;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+/**
+ * A StreamExecutionEnvironment that executes its jobs on a test cluster.
+ */
public class TestStreamEnvironment extends StreamExecutionEnvironment {
- private static final String DEFAULT_JOBNAME = "TestStreamingJob";
- private static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
-
- private long memorySize;
- protected JobExecutionResult latestResult;
+
+ /** The mini cluster in which this environment executes its jobs */
private ForkableFlinkMiniCluster executor;
- private boolean internalExecutor;
-
- public TestStreamEnvironment(int parallelism, long memorySize){
- setParallelism(parallelism);
- this.memorySize = memorySize;
- internalExecutor = true;
- }
+
- public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){
+ public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
this.executor = Preconditions.checkNotNull(executor);
- setDefaultLocalParallelism(parallelism);
setParallelism(parallelism);
}
-
- @Override
- public JobExecutionResult execute() throws Exception {
- return execute(DEFAULT_JOBNAME);
- }
-
+
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- JobExecutionResult result = execute(getStreamGraph().getJobGraph(jobName));
- return result;
- }
-
- public JobExecutionResult execute(JobGraph jobGraph) throws Exception {
- if (internalExecutor) {
- Configuration configuration = jobGraph.getJobConfiguration();
-
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
- getParallelism());
- configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-
- executor = new ForkableFlinkMiniCluster(configuration);
- executor.start();
- }
- try {
- sync = true;
- latestResult = executor.submitJobAndWait(jobGraph, false);
- return latestResult;
- } catch (JobExecutionException e) {
- if (e.getMessage().contains("GraphConversionException")) {
- throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
- } else {
- throw e;
- }
- } finally {
- transformations.clear();
- if (internalExecutor){
- executor.shutdown();
- }
- }
- }
-
- private ForkableFlinkMiniCluster cluster = null;
- private Thread jobRunner = null;
- private boolean sync = true;
-
- public void start(final JobGraph jobGraph) throws Exception {
- if(cluster != null) {
- throw new IllegalStateException("The cluster is already running");
- }
-
- if (internalExecutor) {
- Configuration configuration = jobGraph.getJobConfiguration();
-
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
- getParallelism());
- configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-
- cluster = new ForkableFlinkMiniCluster(configuration);
- } else {
- cluster = executor;
- }
- try {
- sync = false;
-
- jobRunner = new Thread() {
- public void run() {
- try {
- latestResult = cluster.submitJobAndWait(jobGraph, false);
- } catch (JobExecutionException e) {
- // TODO remove: hack to make ITCase succeed because .submitJobAndWait() throws exception on .stop() (see this.shutdown())
- latestResult = new JobExecutionResult(null, 0, null);
- e.printStackTrace();
- //throw new RuntimeException(e);
- } catch (Exception e) {
- new RuntimeException(e);
- }
- }
- };
- jobRunner.start();
- } catch(RuntimeException e) {
- if (e.getCause().getMessage().contains("GraphConversionException")) {
- throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
- } else {
- throw e;
- }
- }
- }
-
- public JobExecutionResult shutdown() throws InterruptedException {
- if(!sync) {
- cluster.stop();
- cluster = null;
-
- jobRunner.join();
- jobRunner = null;
-
- return latestResult;
- }
-
- throw new IllegalStateException("Cluster was not started via .start(...)");
- }
-
- public boolean clusterRunsSynchronous() {
- return sync;
+ final JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
+ return executor.submitJobAndWait(jobGraph, false);
}
- protected void setAsContext() {
+ // ------------------------------------------------------------------------
+
+ /**
+ * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
+ * the given cluster with the given default parallelism.
+ *
+ * @param cluster The test cluster to run the test program on.
+ * @param parallelism The default parallelism for the test programs.
+ */
+ public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
+
StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
@Override
public StreamExecutionEnvironment createExecutionEnvironment() {
- return TestStreamEnvironment.this;
+ return new TestStreamEnvironment(cluster, parallelism);
}
};
initializeContextEnvironment(factory);
}
+ /**
+ * Resets the streaming context environment to null.
+ */
+ public static void unsetAsContext() {
+ resetContextEnvironment();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
index fb0ab0a..37812c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExamp
import org.apache.flink.streaming.util.StreamingProgramTestBase;
public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+
protected String textPath;
protected String resultPath;
@@ -40,6 +41,5 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
@Override
protected void testProgram() throws Exception {
TopSpeedWindowing.main(new String[]{textPath, resultPath});
-
}
}