You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/11 18:53:16 UTC

[flink] branch release-1.11 updated: [FLINK-17788][scala-shell] Fix yarn session support in scala shell

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 3d08530  [FLINK-17788][scala-shell] Fix yarn session support in scala shell
3d08530 is described below

commit 3d0853035c1a70522bf239ced4b17c0f67c5e025
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Jun 10 15:25:39 2020 +0200

    [FLINK-17788][scala-shell] Fix yarn session support in scala shell
---
 .../src/main/scala/org/apache/flink/api/scala/FlinkShell.scala   | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index cfd91a5..f77b938 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -222,12 +222,14 @@ object FlinkShell {
       case None => fetchDeployedYarnClusterInfo(config, clusterConfig, "default")
     }
 
+    println("Configuration: " + effectiveConfig)
+
     (effectiveConfig, clusterClient)
   }
 
   private def deployNewYarnCluster(config: Config, flinkConfig: Configuration) = {
     val effectiveConfig = new Configuration(flinkConfig)
-    val args = parseYarnArgList(config, "yarn-cluster")
+    val args = parseArgList(config, "yarn-cluster")
 
     val configurationDirectory = getConfigDir(config)
 
@@ -253,6 +255,7 @@ object FlinkShell {
         .deploySessionCluster(clusterSpecification)
         .getClusterClient
     } finally {
+      executorConfig.set(DeploymentOptions.TARGET, "yarn-session")
       clusterDescriptor.close()
     }
 
@@ -265,7 +268,7 @@ object FlinkShell {
       mode: String) = {
 
     val effectiveConfig = new Configuration(flinkConfig)
-    val args = parseYarnArgList(config, mode)
+    val args = parseArgList(config, mode)
 
     val configurationDirectory = getConfigDir(config)
 
@@ -284,7 +287,7 @@ object FlinkShell {
     (executorConfig, None)
   }
 
-  def parseYarnArgList(config: Config, mode: String): Array[String] = {
+  def parseArgList(config: Config, mode: String): Array[String] = {
     val args = if (mode == "default") {
       ArrayBuffer[String]()
     } else {