You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/20 16:49:07 UTC

[02/10] flink git commit: [FLINK-4669] [apis] Add createLocalEnvironment() utility method that starts the web UI

[FLINK-4669] [apis] Add createLocalEnvironment() utility method that starts the web UI

This closes #2541


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/570ed4e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/570ed4e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/570ed4e2

Branch: refs/heads/master
Commit: 570ed4e23a22381374c7a009f7cd0d743699891c
Parents: e42a173
Author: shijinkui <sh...@huawei.com>
Authored: Tue Nov 29 12:56:25 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:01:16 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    | 23 ++++++++++++++++++-
 .../flink/api/scala/ExecutionEnvironment.scala  | 24 ++++++++++++++++++--
 .../environment/StreamExecutionEnvironment.java | 21 +++++++++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  | 23 +++++++++++++++++++
 4 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 964eed1..0ac9ccf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
@@ -1216,7 +1217,27 @@ public abstract class ExecutionEnvironment {
 	public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
 		return new LocalEnvironment(customConfiguration);
 	}
-	
+
+	/**
+	 * Creates a local execution environment with enable running web UI
+	 *
+	 * @return [[StreamExecutionEnvironment]]
+	 */
+	public static ExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+		}
+
+		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+		LocalEnvironment localEnv = new LocalEnvironment(conf);
+		if (localEnv.getConfig().getParallelism() < 0) {
+			localEnv.setParallelism(defaultLocalDop);
+		}
+
+		return localEnv;
+	}
+
 	/**
 	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program 
 	 * to a cluster for execution. Note that all file paths used in the program must be accessible from the

http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 18aab07..a61c65d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
 import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
 import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
@@ -748,10 +748,30 @@ object ExecutionEnvironment {
   }
 
   /**
+   * Creates a local execution environment with enable running web UI
+   *
+   * @return [[ExecutionEnvironment]]
+   */
+  def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): ExecutionEnvironment = {
+    val conf = confOps match {
+      case Some(cf) => cf
+      case None => new Configuration
+    }
+
+    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
+    if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+      val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
+      conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
+    }
+
+    new ExecutionEnvironment(JavaEnv.createLocalEnvironment(conf))
+  }
+
+  /**
    * Creates an execution environment that uses Java Collections underneath. This will execute in a
    * single thread in the current JVM. It is very fast but will fail if the data does not fit into
    * memory. This is useful during implementation and for debugging.
- *
+   *
    * @return
    */
   @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 99784e9..6c35b09 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -1645,6 +1646,26 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Creates a local execution environment with enable running web UI
+	 *
+	 * @return [[StreamExecutionEnvironment]]
+	 */
+	public static StreamExecutionEnvironment createLocalEnvWithWebUI(Configuration conf) {
+		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+		}
+
+		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
+		if (localEnv.getConfig().getParallelism() < 0) {
+			localEnv.setParallelism(defaultLocalParallelism);
+		}
+
+		return localEnv;
+	}
+
+	/**
 	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
 	 * (parts of) the program to a cluster for execution. Note that all file
 	 * paths used in the program must be accessible from the cluster. The

http://git-wip-us.apache.org/repos/asf/flink/blob/570ed4e2/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 432e8ac..d8aa1eb 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStra
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.state.AbstractStateBackend
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source._
@@ -705,6 +706,28 @@ object StreamExecutionEnvironment {
   }
 
   /**
+   * Creates a local execution environment with enable running web UI
+   *
+   * @param confOps optional config of Flink
+   * @return [[StreamExecutionEnvironment]]
+   */
+  def createLocalEnvWithWebUI(confOps: Option[Configuration] = None): StreamExecutionEnvironment = {
+    val conf = confOps match {
+      case Some(cf) => cf
+      case None => new Configuration
+    }
+
+    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
+    if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+      val port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT
+      conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port)
+    }
+
+    val parallelism = Runtime.getRuntime.availableProcessors()
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, conf))
+  }
+
+  /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program to
    * a cluster for execution. Note that all file paths used in the program must be accessible from
    * the cluster. The execution will use the cluster's default parallelism, unless the