You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by va...@apache.org on 2019/04/11 16:29:55 UTC
[incubator-livy] branch master updated: [LIVY-563] Propagate RSC
configuration when creating sessions.
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 5abc043 [LIVY-563] Propagate RSC configuration when creating sessions.
5abc043 is described below
commit 5abc043708b443ba36612a4f4e2f5137bd63621a
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Thu Apr 11 09:29:39 2019 -0700
[LIVY-563] Propagate RSC configuration when creating sessions.
Even though not all RSC configs apply to the remote driver, a few do,
so propagate all of them when starting a new session.
Includes new unit test.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #168 from vanzin/LIVY-563.
---
rsc/src/main/java/org/apache/livy/rsc/RSCConf.java | 2 +-
.../livy/server/interactive/InteractiveSession.scala | 9 +++++++++
.../livy/server/interactive/InteractiveSessionSpec.scala | 14 +++++++++++++-
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
index 7c76164..d2496b5 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
@@ -35,7 +35,7 @@ public class RSCConf extends ClientConf<RSCConf> {
public static final String SPARK_CONF_PREFIX = "spark.";
public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + "__livy__.";
- private static final String RSC_CONF_PREFIX = "livy.rsc.";
+ public static final String RSC_CONF_PREFIX = "livy.rsc.";
public static enum Entry implements ConfEntry {
CLIENT_ID("client.auth.id", null),
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 6ec2d75..9529ed3 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -346,6 +346,15 @@ object InteractiveSession extends Logging {
mergeHiveSiteAndHiveDeps(sparkMajorVersion)
}
+ // Pick all the RSC-specific configs that have not been explicitly set otherwise, and
+ // put them in the resulting properties, so that the remote driver can use them.
+ livyConf.iterator().asScala.foreach { e =>
+ val (key, value) = (e.getKey(), e.getValue())
+ if (key.startsWith(RSCConf.RSC_CONF_PREFIX) && !builderProperties.contains(key)) {
+ builderProperties(key) = value
+ }
+ }
+
builderProperties
}
}
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 2a99abb..2e21483 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -130,7 +130,6 @@ class InteractiveSessionSpec extends FunSpec
"dummy.jar"))
}
-
it("should set rsc jars through livy conf") {
val rscJars = Set(
"dummy.jar",
@@ -177,6 +176,19 @@ class InteractiveSessionSpec extends FunSpec
session.state should (be(SessionState.Starting) or be(SessionState.Idle))
}
+ it("should propagate RSC configuration properties") {
+ val livyConf = new LivyConf(false)
+ .set(LivyConf.REPL_JARS, "dummy.jar")
+ .set(RSCConf.Entry.SASL_QOP.key(), "foo")
+ .set(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key(), "TRACE")
+ .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+ .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")
+
+ val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark, livyConf)
+ assert(properties(RSCConf.Entry.SASL_QOP.key()) === "foo")
+ assert(properties(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key()) === "TRACE")
+ }
+
withSession("should execute `1 + 2` == 3") { session =>
val pyResult = executeStatement("1 + 2", Some("pyspark"))
pyResult should equal (Extraction.decompose(Map(