You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:53 UTC
[04/10] flink git commit: [FLINK-5141] [streaming api] Implement
LocalStreamEnvironment for new mini cluster.
[FLINK-5141] [streaming api] Implement LocalStreamEnvironment for new mini cluster.
This closes #2877
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0086b57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0086b57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0086b57
Branch: refs/heads/flip-6
Commit: c0086b57eec63bab627383205eed2ff8636c5394
Parents: 4afcc4a
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Wed Nov 23 17:02:11 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../Flip6LocalStreamEnvironment.java | 128 +++++++++++++++++++
1 file changed, 128 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0086b57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
new file mode 100644
index 0000000..a0c128e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Flip6LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
+ * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
+ * Flink cluster in the background and executes the program on that cluster.
+ *
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
+ * parallelism can be set via {@link #setParallelism(int)}.
+ */
+@Public
+public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
+
+ /** The configuration to use for the mini cluster */
+ private final Configuration conf;
+
+ /**
+ * Creates a new mini cluster stream environment that uses the default configuration.
+ */
+ public Flip6LocalStreamEnvironment() {
+ this(null);
+ }
+
+ /**
+ * Creates a new mini cluster stream environment that configures its local executor with the given configuration.
+ *
+ * @param config The configuration used to configure the local executor.
+ */
+ public Flip6LocalStreamEnvironment(Configuration config) {
+ if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+ throw new InvalidProgramException(
+ "The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
+ "or running in a TestEnvironment context.");
+ }
+
+ this.conf = config == null ? new Configuration() : config;
+ }
+
+ /**
+ * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
+ * specified name.
+ *
+ * @param jobName
+ * name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ */
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ // transform the streaming program into a JobGraph
+ StreamGraph streamGraph = getStreamGraph();
+ streamGraph.setJobName(jobName);
+
+ JobGraph jobGraph = streamGraph.getJobGraph();
+
+ jobGraph.setAllowQueuedScheduling(true);
+
+ // As jira FLINK-5140 described,
+ // we have to set restart strategy to handle NoResourceAvailableException.
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setRestartStrategy(
+ RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+ jobGraph.setExecutionConfig(executionConfig);
+
+ Configuration configuration = new Configuration();
+ configuration.addAll(jobGraph.getJobConfiguration());
+ configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+
+ // add (and override) the settings with what the user defined
+ configuration.addAll(this.conf);
+
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
+
+ // Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+ int slotsCount = 0;
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ slotsCount += jobVertex.getParallelism();
+ }
+ cfg.setNumTaskManagerSlots(slotsCount);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Running job on local embedded Flink mini cluster");
+ }
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ try {
+ miniCluster.start();
+ return miniCluster.runJobBlocking(jobGraph);
+ } finally {
+ transformations.clear();
+ miniCluster.shutdown();
+ }
+ }
+}