You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/04/26 11:11:57 UTC

flink git commit: [FLINK-3775] [shell] Load Flink configuration before forwarding it

Repository: flink
Updated Branches:
  refs/heads/master 8330539e4 -> 7abf8ef77


[FLINK-3775] [shell] Load Flink configuration before forwarding it

This commit makes sure that the GlobalConfiguration is loaded before the FlinkShell
is started.

Add configDir option to FlinkShell

This allows to configure a configuration directory for the FlinkShell. If the
CLI option is not set, then the system tries to find the configuration directory
using first the FLINK_CONF_DIR environment variable and then the standard directories.

Add Apache license header to dummy flink-conf.yaml file

This closes #1914.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7abf8ef7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7abf8ef7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7abf8ef7

Branch: refs/heads/master
Commit: 7abf8ef774f14d688213c41b6986ee64aba98740
Parents: 8330539
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Apr 19 15:04:37 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 26 11:11:13 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/FlinkShell.scala | 54 ++++++++++++--------
 .../src/test/resources/flink-conf.yaml          | 17 ++++++
 .../flink/api/scala/ScalaShellITCase.scala      |  9 +++-
 .../scala/ScalaShellLocalStartupITCase.scala    | 18 ++++---
 4 files changed, 71 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 2c2fbb3..2618b09 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -42,7 +42,8 @@ object FlinkShell {
     port: Option[Int] = None,
     externalJars: Option[Array[String]] = None,
     executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
-    yarnConfig: Option[YarnConfig] = None
+    yarnConfig: Option[YarnConfig] = None,
+    configDir: Option[String] = None
   )
 
   /** YARN configuration object */
@@ -64,62 +65,68 @@ object FlinkShell {
 
       cmd("local") action {
         (_, c) => c.copy(executionMode = ExecutionMode.LOCAL)
-      } text("Starts Flink scala shell with a local Flink cluster") children(
+      } text "Starts Flink scala shell with a local Flink cluster" children(
         opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
           case (x, c) =>
             val xArray = x.split(":")
             c.copy(externalJars = Option(xArray))
-          } text("Specifies additional jars to be used in Flink")
+          } text "Specifies additional jars to be used in Flink"
         )
 
       cmd("remote") action { (_, c) =>
         c.copy(executionMode = ExecutionMode.REMOTE)
-      } text("Starts Flink scala shell connecting to a remote cluster") children(
+      } text "Starts Flink scala shell connecting to a remote cluster" children(
         arg[String]("<host>") action { (h, c) =>
           c.copy(host = Some(h)) }
-          text("Remote host name as string"),
+          text "Remote host name as string",
         arg[Int]("<port>") action { (p, c) =>
           c.copy(port = Some(p)) }
-          text("Remote port as integer\n"),
-        opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") action {
+          text "Remote port as integer\n",
+        opt[String]("addclasspath") abbr("a") valueName("<path/to/jar>") action {
           case (x, c) =>
             val xArray = x.split(":")
             c.copy(externalJars = Option(xArray))
-        } text ("Specifies additional jars to be used in Flink")
+        } text "Specifies additional jars to be used in Flink"
       )
 
       cmd("yarn") action {
         (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = None)
-      } text ("Starts Flink scala shell connecting to a yarn cluster") children(
+      } text "Starts Flink scala shell connecting to a yarn cluster" children(
         opt[Int]("container") abbr ("n") valueName ("arg") action {
           (x, c) =>
             c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = Some(x))))
-        } text ("Number of YARN container to allocate (= Number of TaskManagers)"),
+        } text "Number of YARN container to allocate (= Number of TaskManagers)",
         opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
           (x, c) =>
             c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
-        } text ("Memory for JobManager container [in MB]"),
+        } text "Memory for JobManager container [in MB]",
         opt[String]("name") abbr ("nm") action {
           (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name = Some(x))))
-        } text ("Set a custom name for the application on YARN"),
+        } text "Set a custom name for the application on YARN",
         opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
           (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(queue = Some(x))))
-        } text ("Specifies YARN queue"),
+        } text "Specifies YARN queue",
         opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
           (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(slots = Some(x))))
-        } text ("Number of slots per TaskManager"),
+        } text "Number of slots per TaskManager",
         opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>") action {
           (x, c) =>
             c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
-        } text ("Memory per TaskManager container [in MB]"),
+        } text "Memory per TaskManager container [in MB]",
         opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
           case (x, c) =>
             val xArray = x.split(":")
             c.copy(externalJars = Option(xArray))
-        } text("Specifies additional jars to be used in Flink")
+        } text "Specifies additional jars to be used in Flink"
       )
 
-      help("help") abbr ("h") text ("Prints this usage text")
+      opt[String]("configDir").optional().action {
+        (arg, conf) => conf.copy(configDir = Option(arg))
+      } text {
+        "The configuration directory."
+      }
+
+      help("help") abbr ("h") text "Prints this usage text"
     }
 
     // parse arguments
@@ -167,6 +174,15 @@ object FlinkShell {
   def startShell(config: Config): Unit = {
     println("Starting Flink Shell:")
 
+    // load global configuration
+    val confDirPath = config.configDir match {
+      case Some(confDir) => confDir
+      case None => CliFrontend.getConfigurationDirectoryFromEnv
+    }
+
+    val configDirectory = new File(confDirPath)
+    GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath)
+
     val (repl, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(config)
       val conf = cluster match {
@@ -216,13 +232,11 @@ object FlinkShell {
     val jarPath = new Path("file://" +
       s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
     yarnClient.setLocalJarPath(jarPath)
-
-    // load configuration
+    
     val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
     val flinkConfiguration = GlobalConfiguration.getConfiguration
     val confFile = new File(confDirPath + File.separator + "flink-conf.yaml")
     val confPath = new Path(confFile.getAbsolutePath)
-    GlobalConfiguration.loadConfiguration(confDirPath)
     yarnClient.setFlinkConfiguration(flinkConfiguration)
     yarnClient.setConfigurationDirectory(confDirPath)
     yarnClient.setConfigurationFilePath(confPath)

http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/test/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/resources/flink-conf.yaml b/flink-scala-shell/src/test/resources/flink-conf.yaml
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-scala-shell/src/test/resources/flink-conf.yaml
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################

http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 6642cff..6effce7 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -287,11 +287,18 @@ class ScalaShellITCase extends TestLogger {
     val oldOut: PrintStream = System.out
     System.setOut(new PrintStream(baos))
 
+    val confFile: String = classOf[ScalaShellLocalStartupITCase]
+      .getResource("/flink-conf.yaml")
+      .getFile
+    val confDir = new File(confFile).getAbsoluteFile.getParent
+
     val (c, args) = cluster match{
       case Some(cl) =>
         val arg = Array("remote",
           cl.hostname,
-          Integer.toString(cl.getLeaderRPCPort))
+          Integer.toString(cl.getLeaderRPCPort),
+          "--configDir",
+          confDir)
         (cl, arg)
       case None =>
         throw new AssertionError("Cluster creation failed.")

http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index 0e7dd56..6f44bfe 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -71,12 +71,18 @@ class ScalaShellLocalStartupITCase extends TestLogger {
         |
         |:q
       """.stripMargin
-      val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
-      val out: StringWriter = new StringWriter
-      val baos: ByteArrayOutputStream = new ByteArrayOutputStream
-      val oldOut: PrintStream = System.out
-      System.setOut(new PrintStream(baos))
-      val args: Array[String] = Array("local")
+    val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
+    val out: StringWriter = new StringWriter
+    val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+    val oldOut: PrintStream = System.out
+    System.setOut(new PrintStream(baos))
+
+    val confFile: String = classOf[ScalaShellLocalStartupITCase]
+      .getResource("/flink-conf.yaml")
+      .getFile
+    val confDir = new File(confFile).getAbsoluteFile.getParent
+
+    val args: Array[String] = Array("local", "--configDir", confDir)
 
     //start flink scala shell
     FlinkShell.bufferedReader = Some(in);