You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/20 16:49:07 UTC
[02/10] flink git commit: [FLINK-4669] [apis] Add
createLocalEnvironment() utility method that starts the web UI
[FLINK-4669] [apis] Add createLocalEnvironment() utility method that starts the web UI
This closes #2541
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/570ed4e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/570ed4e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/570ed4e2
Branch: refs/heads/master
Commit: 570ed4e23a22381374c7a009f7cd0d743699891c
Parents: e42a173
Author: shijinkui <sh...@huawei.com>
Authored: Tue Nov 29 12:56:25 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:16 2016 +0100
----------------------------------------------------------------------
.../flink/api/java/ExecutionEnvironment.java | 23 ++++++++++++++++++-
.../flink/api/scala/ExecutionEnvironment.scala | 24 ++++++++++++++++++--
.../environment/StreamExecutionEnvironment.java | 21 +++++++++++++++++
.../api/scala/StreamExecutionEnvironment.scala | 23 +++++++++++++++++++
4 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 964eed1..0ac9ccf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
@@ -1216,7 +1217,27 @@ public abstract class ExecutionEnvironment {
public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
return new LocalEnvironment(customConfiguration);
}
-
+
+ /**
+ * Creates a local execution environment with enable running web UI
+ *
+ * @return [[StreamExecutionEnvironment]]
+ */
+ public static ExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+ if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+ int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+ conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+ }
+
+ conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+ LocalEnvironment localEnv = new LocalEnvironment(conf);
+ if (localEnv.getConfig().getParallelism() < 0) {
+ localEnv.setParallelism(defaultLocalDop);
+ }
+
+ return localEnv;
+ }
+
/**
* Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible from the
http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 18aab07..a61c65d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.core.fs.Path
import org.apache.flink.types.StringValue
import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
@@ -748,10 +748,30 @@ object ExecutionEnvironment {
}
/**
+ * Creates a local execution environment with enable running web UI
+ *
+ * @return [[ExecutionEnvironment]]
+ */
+ def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): ExecutionEnvironment = {
+ val conf = confOps match {
+ case Some(cf) => cf
+ case None => new Configuration
+ }
+
+ conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
+ if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+ val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
+ conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
+ }
+
+ new ExecutionEnvironment(JavaEnv.createLocalEnvironment(conf))
+ }
+
+ /**
* Creates an execution environment that uses Java Collections underneath. This will execute in a
* single thread in the current JVM. It is very fast but will fail if the data does not fit into
* memory. This is useful during implementation and for debugging.
- *
+ *
* @return
*/
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 99784e9..6c35b09 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1645,6 +1646,26 @@ public abstract class StreamExecutionEnvironment {
}
/**
+ * Creates a local execution environment with enable running web UI
+ *
+ * @return [[StreamExecutionEnvironment]]
+ */
+ public static StreamExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+ if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+ int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+ conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+ }
+
+ conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+ LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
+ if (localEnv.getConfig().getParallelism() < 0) {
+ localEnv.setParallelism(defaultLocalParallelism);
+ }
+
+ return localEnv;
+ }
+
+ /**
* Creates a {@link RemoteStreamEnvironment}. The remote environment sends
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 432e8ac..d8aa1eb 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStra
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.state.AbstractStateBackend
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source._
@@ -705,6 +706,28 @@ object StreamExecutionEnvironment {
}
/**
+ * Creates a local execution environment with enable running web UI
+ *
+ * @param confOps optional config of Flink
+ * @return [[StreamExecutionEnvironment]]
+ */
+ def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): StreamExecutionEnvironment = {
+ val conf = confOps match {
+ case Some(cf) => cf
+ case None => new Configuration
+ }
+
+ conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
+ if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+ val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
+ conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
+ }
+
+ val parallelism = Runtime.getRuntime.availableProcessors()
+ new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, conf))
+ }
+
+ /**
* Creates a remote execution environment. The remote environment sends (parts of) the program to
* a cluster for execution. Note that all file paths used in the program must be accessible from
* the cluster. The execution will use the cluster's default parallelism, unless the