You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/04/26 11:11:57 UTC
flink git commit: [FLINK-3775] [shell] Load Flink configuration
before forwarding it
Repository: flink
Updated Branches:
refs/heads/master 8330539e4 -> 7abf8ef77
[FLINK-3775] [shell] Load Flink configuration before forwarding it
This commit makes sure that the GlobalConfiguration is loaded before the FlinkShell
is started.
Add configDir option to FlinkShell
This allows to configure a configuration directory for the FlinkShell. If the
CLI option is not set, then the system tries to find the configuration directory
using first the FLINK_CONF_DIR environment variable and then the standard directories.
Add Apache license header to dummy flink-conf.yaml file
This closes #1914.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7abf8ef7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7abf8ef7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7abf8ef7
Branch: refs/heads/master
Commit: 7abf8ef774f14d688213c41b6986ee64aba98740
Parents: 8330539
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Apr 19 15:04:37 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Apr 26 11:11:13 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/scala/FlinkShell.scala | 54 ++++++++++++--------
.../src/test/resources/flink-conf.yaml | 17 ++++++
.../flink/api/scala/ScalaShellITCase.scala | 9 +++-
.../scala/ScalaShellLocalStartupITCase.scala | 18 ++++---
4 files changed, 71 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 2c2fbb3..2618b09 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -42,7 +42,8 @@ object FlinkShell {
port: Option[Int] = None,
externalJars: Option[Array[String]] = None,
executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
- yarnConfig: Option[YarnConfig] = None
+ yarnConfig: Option[YarnConfig] = None,
+ configDir: Option[String] = None
)
/** YARN configuration object */
@@ -64,62 +65,68 @@ object FlinkShell {
cmd("local") action {
(_, c) => c.copy(executionMode = ExecutionMode.LOCAL)
- } text("Starts Flink scala shell with a local Flink cluster") children(
+ } text "Starts Flink scala shell with a local Flink cluster" children(
opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
- } text("Specifies additional jars to be used in Flink")
+ } text "Specifies additional jars to be used in Flink"
)
cmd("remote") action { (_, c) =>
c.copy(executionMode = ExecutionMode.REMOTE)
- } text("Starts Flink scala shell connecting to a remote cluster") children(
+ } text "Starts Flink scala shell connecting to a remote cluster" children(
arg[String]("<host>") action { (h, c) =>
c.copy(host = Some(h)) }
- text("Remote host name as string"),
+ text "Remote host name as string",
arg[Int]("<port>") action { (p, c) =>
c.copy(port = Some(p)) }
- text("Remote port as integer\n"),
- opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") action {
+ text "Remote port as integer\n",
+ opt[String]("addclasspath") abbr("a") valueName("<path/to/jar>") action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
- } text ("Specifies additional jars to be used in Flink")
+ } text "Specifies additional jars to be used in Flink"
)
cmd("yarn") action {
(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = None)
- } text ("Starts Flink scala shell connecting to a yarn cluster") children(
+ } text "Starts Flink scala shell connecting to a yarn cluster" children(
opt[Int]("container") abbr ("n") valueName ("arg") action {
(x, c) =>
c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = Some(x))))
- } text ("Number of YARN container to allocate (= Number of TaskManagers)"),
+ } text "Number of YARN container to allocate (= Number of TaskManagers)",
opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
(x, c) =>
c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
- } text ("Memory for JobManager container [in MB]"),
+ } text "Memory for JobManager container [in MB]",
opt[String]("name") abbr ("nm") action {
(x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name = Some(x))))
- } text ("Set a custom name for the application on YARN"),
+ } text "Set a custom name for the application on YARN",
opt[String]("queue") abbr ("qu") valueName ("<arg>") action {
(x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(queue = Some(x))))
- } text ("Specifies YARN queue"),
+ } text "Specifies YARN queue",
opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
(x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(slots = Some(x))))
- } text ("Number of slots per TaskManager"),
+ } text "Number of slots per TaskManager",
opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>") action {
(x, c) =>
c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
- } text ("Memory per TaskManager container [in MB]"),
+ } text "Memory per TaskManager container [in MB]",
opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
- } text("Specifies additional jars to be used in Flink")
+ } text "Specifies additional jars to be used in Flink"
)
- help("help") abbr ("h") text ("Prints this usage text")
+ opt[String]("configDir").optional().action {
+ (arg, conf) => conf.copy(configDir = Option(arg))
+ } text {
+ "The configuration directory."
+ }
+
+ help("help") abbr ("h") text "Prints this usage text"
}
// parse arguments
@@ -167,6 +174,15 @@ object FlinkShell {
def startShell(config: Config): Unit = {
println("Starting Flink Shell:")
+ // load global configuration
+ val confDirPath = config.configDir match {
+ case Some(confDir) => confDir
+ case None => CliFrontend.getConfigurationDirectoryFromEnv
+ }
+
+ val configDirectory = new File(confDirPath)
+ GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath)
+
val (repl, cluster) = try {
val (host, port, cluster) = fetchConnectionInfo(config)
val conf = cluster match {
@@ -216,13 +232,11 @@ object FlinkShell {
val jarPath = new Path("file://" +
s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
yarnClient.setLocalJarPath(jarPath)
-
- // load configuration
+
val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
val flinkConfiguration = GlobalConfiguration.getConfiguration
val confFile = new File(confDirPath + File.separator + "flink-conf.yaml")
val confPath = new Path(confFile.getAbsolutePath)
- GlobalConfiguration.loadConfiguration(confDirPath)
yarnClient.setFlinkConfiguration(flinkConfiguration)
yarnClient.setConfigurationDirectory(confDirPath)
yarnClient.setConfigurationFilePath(confPath)
http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/test/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/resources/flink-conf.yaml b/flink-scala-shell/src/test/resources/flink-conf.yaml
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-scala-shell/src/test/resources/flink-conf.yaml
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 6642cff..6effce7 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -287,11 +287,18 @@ class ScalaShellITCase extends TestLogger {
val oldOut: PrintStream = System.out
System.setOut(new PrintStream(baos))
+ val confFile: String = classOf[ScalaShellLocalStartupITCase]
+ .getResource("/flink-conf.yaml")
+ .getFile
+ val confDir = new File(confFile).getAbsoluteFile.getParent
+
val (c, args) = cluster match{
case Some(cl) =>
val arg = Array("remote",
cl.hostname,
- Integer.toString(cl.getLeaderRPCPort))
+ Integer.toString(cl.getLeaderRPCPort),
+ "--configDir",
+ confDir)
(cl, arg)
case None =>
throw new AssertionError("Cluster creation failed.")
http://git-wip-us.apache.org/repos/asf/flink/blob/7abf8ef7/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index 0e7dd56..6f44bfe 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -71,12 +71,18 @@ class ScalaShellLocalStartupITCase extends TestLogger {
|
|:q
""".stripMargin
- val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
- val out: StringWriter = new StringWriter
- val baos: ByteArrayOutputStream = new ByteArrayOutputStream
- val oldOut: PrintStream = System.out
- System.setOut(new PrintStream(baos))
- val args: Array[String] = Array("local")
+ val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
+ val out: StringWriter = new StringWriter
+ val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+ val oldOut: PrintStream = System.out
+ System.setOut(new PrintStream(baos))
+
+ val confFile: String = classOf[ScalaShellLocalStartupITCase]
+ .getResource("/flink-conf.yaml")
+ .getFile
+ val confDir = new File(confFile).getAbsoluteFile.getParent
+
+ val args: Array[String] = Array("local", "--configDir", confDir)
//start flink scala shell
FlinkShell.bufferedReader = Some(in);