You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:54 UTC

[35/52] [abbrv] flink git commit: [FLINK-5141] [streaming api] Implement LocalStreamEnvironment for new mini cluster.

[FLINK-5141] [streaming api] Implement LocalStreamEnvironment for new mini cluster.

This closes #2877


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73d27d7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73d27d7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73d27d7c

Branch: refs/heads/master
Commit: 73d27d7cd7680858d111c6442a2aa6b6b94c7cef
Parents: 4d3a3ee
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Wed Nov 23 17:02:11 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:26 2016 +0100

----------------------------------------------------------------------
 .../Flip6LocalStreamEnvironment.java            | 128 +++++++++++++++++++
 1 file changed, 128 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73d27d7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
new file mode 100644
index 0000000..a0c128e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Flip6LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
+ * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
+ * Flink cluster in the background and executes the program on that cluster.
+ *
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
+ * parallelism can be set via {@link #setParallelism(int)}.
+ */
+@Public
+public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
+
+	/** The configuration to use for the mini cluster */
+	private final Configuration conf;
+
+	/**
+	 * Creates a new mini cluster stream environment that uses the default configuration.
+	 */
+	public Flip6LocalStreamEnvironment() {
+		this(null);
+	}
+
+	/**
+	 * Creates a new mini cluster stream environment that configures its local executor with the given configuration.
+	 *
+	 * @param config The configuration used to configure the local executor.
+	 */
+	public Flip6LocalStreamEnvironment(Configuration config) {
+		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+			throw new InvalidProgramException(
+					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
+							"or running in a TestEnvironment context.");
+		}
+		
+		this.conf = config == null ? new Configuration() : config;
+	}
+
+	/**
+	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
+	 * specified name.
+	 * 
+	 * @param jobName
+	 *            name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 */
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		// transform the streaming program into a JobGraph
+		StreamGraph streamGraph = getStreamGraph();
+		streamGraph.setJobName(jobName);
+
+		JobGraph jobGraph = streamGraph.getJobGraph();
+
+		jobGraph.setAllowQueuedScheduling(true);
+
+		// As jira FLINK-5140 described,
+		// we have to set restart strategy to handle NoResourceAvailableException.
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(
+			RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+		jobGraph.setExecutionConfig(executionConfig);
+
+		Configuration configuration = new Configuration();
+		configuration.addAll(jobGraph.getJobConfiguration());
+		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+
+		// add (and override) the settings with what the user defined
+		configuration.addAll(this.conf);
+
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
+
+		// Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+		int slotsCount = 0;
+		for (JobVertex jobVertex : jobGraph.getVertices()) {
+			slotsCount += jobVertex.getParallelism();
+		}
+		cfg.setNumTaskManagerSlots(slotsCount);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running job on local embedded Flink mini cluster");
+		}
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		try {
+			miniCluster.start();
+			return miniCluster.runJobBlocking(jobGraph);
+		} finally {
+			transformations.clear();
+			miniCluster.shutdown();
+		}
+	}
+}