You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/08/15 18:10:12 UTC

spark git commit: [SPARK-16671][CORE][SQL] Consolidate code to do variable substitution.

Repository: spark
Updated Branches:
  refs/heads/master 564fe614c -> 5da6c4b24


[SPARK-16671][CORE][SQL] Consolidate code to do variable substitution.

Both core and sql have slightly different code that does variable substitution
of config values. This change refactors that code and encapsulates the logic
of reading config values and expading variables in a new helper class, which
can be configured so that both core and sql can use it without losing existing
functionality, and allows for easier testing and makes it easier to add more
features in the future.

Tested with existing and new unit tests, and by running spark-shell with
some configs referencing variables and making sure it behaved as expected.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #14468 from vanzin/SPARK-16671.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5da6c4b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5da6c4b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5da6c4b2

Branch: refs/heads/master
Commit: 5da6c4b24f512b63cd4e6ba7dd8968066a9396f5
Parents: 564fe61
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Mon Aug 15 11:09:54 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Aug 15 11:09:54 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |   9 +-
 .../spark/internal/config/ConfigEntry.scala     |  92 +++-------------
 .../spark/internal/config/ConfigProvider.scala  |  74 +++++++++++++
 .../spark/internal/config/ConfigReader.scala    | 106 +++++++++++++++++++
 .../internal/config/ConfigEntrySuite.scala      |  78 ++++++--------
 .../internal/config/ConfigReaderSuite.scala     |  62 +++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala |   9 +-
 .../sql/internal/VariableSubstitution.scala     |  92 +++-------------
 .../internal/VariableSubstitutionSuite.scala    |  18 ----
 9 files changed, 312 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b6d244b..31b41d9 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.LinkedHashSet
 import org.apache.avro.{Schema, SchemaNormalization}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
+import org.apache.spark.internal.config._
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
@@ -56,6 +56,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
 
   private val settings = new ConcurrentHashMap[String, String]()
 
+  private val reader = new ConfigReader(new SparkConfigProvider(settings))
+  reader.bindEnv(new ConfigProvider {
+    override def get(key: String): Option[String] = Option(getenv(key))
+  })
+
   if (loadDefaults) {
     loadFromSystemProperties(false)
   }
@@ -248,7 +253,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
    * - This will throw an exception is the config is not optional and the value is not set.
    */
   private[spark] def get[T](entry: ConfigEntry[T]): T = {
-    entry.readFrom(settings, getenv)
+    entry.readFrom(reader)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
index e2e23b3..113037d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -26,22 +26,9 @@ import org.apache.spark.SparkConf
 /**
  * An entry contains all meta information for a configuration.
  *
- * Config options created using this feature support variable expansion. If the config value
- * contains variable references of the form "${prefix:variableName}", the reference will be replaced
- * with the value of the variable depending on the prefix. The prefix can be one of:
- *
- * - no prefix: if the config key starts with "spark", looks for the value in the Spark config
- * - system: looks for the value in the system properties
- * - env: looks for the value in the environment
- *
- * So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
- * configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
- * environment variable.
- *
- * For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
- * will also consider the default value when it exists.
- *
- * If the reference cannot be resolved, the original string will be retained.
+ * When applying variable substitution to config values, only references starting with "spark." are
+ * considered in the default namespace. For known Spark configuration keys (i.e. those created using
+ * `ConfigBuilder`), references will also consider the default value when it exists.
  *
  * Variable expansion is also applied to the default values of config entries that have a default
  * value declared as a string.
@@ -72,7 +59,7 @@ private[spark] abstract class ConfigEntry[T] (
 
   def defaultValueString: String
 
-  def readFrom(conf: JMap[String, String], getenv: String => String): T
+  def readFrom(reader: ConfigReader): T
 
   def defaultValue: Option[T] = None
 
@@ -80,13 +67,6 @@ private[spark] abstract class ConfigEntry[T] (
     s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
   }
 
-  protected def readAndExpand(
-      conf: JMap[String, String],
-      getenv: String => String,
-      usedRefs: Set[String] = Set()): Option[String] = {
-    Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs))
-  }
-
 }
 
 private class ConfigEntryWithDefault[T] (
@@ -102,8 +82,8 @@ private class ConfigEntryWithDefault[T] (
 
   override def defaultValueString: String = stringConverter(_defaultValue)
 
-  def readFrom(conf: JMap[String, String], getenv: String => String): T = {
-    readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
+  def readFrom(reader: ConfigReader): T = {
+    reader.get(key).map(valueConverter).getOrElse(_defaultValue)
   }
 
 }
@@ -121,12 +101,9 @@ private class ConfigEntryWithDefaultString[T] (
 
   override def defaultValueString: String = _defaultValue
 
-  def readFrom(conf: JMap[String, String], getenv: String => String): T = {
-    Option(conf.get(key))
-      .orElse(Some(_defaultValue))
-      .map(ConfigEntry.expand(_, conf, getenv, Set()))
-      .map(valueConverter)
-      .get
+  def readFrom(reader: ConfigReader): T = {
+    val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
+    valueConverter(value)
   }
 
 }
@@ -146,8 +123,8 @@ private[spark] class OptionalConfigEntry[T](
 
   override def defaultValueString: String = "<undefined>"
 
-  override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
-    readAndExpand(conf, getenv).map(rawValueConverter)
+  override def readFrom(reader: ConfigReader): Option[T] = {
+    reader.get(key).map(rawValueConverter)
   }
 
 }
@@ -164,18 +141,16 @@ private class FallbackConfigEntry[T] (
 
   override def defaultValueString: String = s"<value of ${fallback.key}>"
 
-  override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
-    Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
+  override def readFrom(reader: ConfigReader): T = {
+    reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
   }
 
 }
 
-private object ConfigEntry {
+private[spark] object ConfigEntry {
 
   private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
 
-  private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
-
   def registerEntry(entry: ConfigEntry[_]): Unit = {
     val existing = knownConfigs.putIfAbsent(entry.key, entry)
     require(existing == null, s"Config entry ${entry.key} already registered!")
@@ -183,43 +158,4 @@ private object ConfigEntry {
 
   def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
 
-  /**
-   * Expand the `value` according to the rules explained in ConfigEntry.
-   */
-  def expand(
-      value: String,
-      conf: JMap[String, String],
-      getenv: String => String,
-      usedRefs: Set[String]): String = {
-    REF_RE.replaceAllIn(value, { m =>
-      val prefix = m.group(1)
-      val name = m.group(2)
-      val replacement = prefix match {
-        case null =>
-          require(!usedRefs.contains(name), s"Circular reference in $value: $name")
-          if (name.startsWith("spark.")) {
-            Option(findEntry(name))
-              .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
-              .orElse(Option(conf.get(name)))
-              .orElse(defaultValueString(name))
-          } else {
-            None
-          }
-        case "system" => sys.props.get(name)
-        case "env" => Option(getenv(name))
-        case _ => None
-      }
-      Regex.quoteReplacement(replacement.getOrElse(m.matched))
-    })
-  }
-
-  private def defaultValueString(key: String): Option[String] = {
-    findEntry(key) match {
-      case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
-      case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString)
-      case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
-      case _ => None
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
new file mode 100644
index 0000000..4b546c8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.{Map => JMap}
+
+/**
+ * A source of configuration values.
+ */
+private[spark] trait ConfigProvider {
+
+  def get(key: String): Option[String]
+
+}
+
+private[spark] class EnvProvider extends ConfigProvider {
+
+  override def get(key: String): Option[String] = sys.env.get(key)
+
+}
+
+private[spark] class SystemProvider extends ConfigProvider {
+
+  override def get(key: String): Option[String] = sys.props.get(key)
+
+}
+
+private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvider {
+
+  override def get(key: String): Option[String] = Option(conf.get(key))
+
+}
+
+/**
+ * A config provider that only reads Spark config keys, and considers default values for known
+ * configs when fetching configuration values.
+ */
+private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {
+
+  import ConfigEntry._
+
+  override def get(key: String): Option[String] = {
+    if (key.startsWith("spark.")) {
+      Option(conf.get(key)).orElse(defaultValueString(key))
+    } else {
+      None
+    }
+  }
+
+  private def defaultValueString(key: String): Option[String] = {
+    findEntry(key) match {
+      case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
+      case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
+      case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
+      case _ => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
new file mode 100644
index 0000000..bb1a3bb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.{Map => JMap}
+import java.util.regex.Pattern
+
+import scala.collection.mutable.HashMap
+import scala.util.matching.Regex
+
+private object ConfigReader {
+
+  private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
+
+}
+
+/**
+ * A helper class for reading config entries and performing variable substitution.
+ *
+ * If a config value contains variable references of the form "${prefix:variableName}", the
+ * reference will be replaced with the value of the variable depending on the prefix. By default,
+ * the following prefixes are handled:
+ *
+ * - no prefix: use the default config provider
+ * - system: looks for the value in the system properties
+ * - env: looks for the value in the environment
+ *
+ * Different prefixes can be bound to a `ConfigProvider`, which is used to read configuration
+ * values from the data source for the prefix, and both the system and env providers can be
+ * overridden.
+ *
+ * If the reference cannot be resolved, the original string will be retained.
+ *
+ * @param conf The config provider for the default namespace (no prefix).
+ */
+private[spark] class ConfigReader(conf: ConfigProvider) {
+
+  def this(conf: JMap[String, String]) = this(new MapProvider(conf))
+
+  private val bindings = new HashMap[String, ConfigProvider]()
+  bind(null, conf)
+  bindEnv(new EnvProvider())
+  bindSystem(new SystemProvider())
+
+  /**
+   * Binds a prefix to a provider. This method is not thread-safe and should be called
+   * before the instance is used to expand values.
+   */
+  def bind(prefix: String, provider: ConfigProvider): ConfigReader = {
+    bindings(prefix) = provider
+    this
+  }
+
+  def bind(prefix: String, values: JMap[String, String]): ConfigReader = {
+    bind(prefix, new MapProvider(values))
+  }
+
+  def bindEnv(provider: ConfigProvider): ConfigReader = bind("env", provider)
+
+  def bindSystem(provider: ConfigProvider): ConfigReader = bind("system", provider)
+
+  /**
+   * Reads a configuration key from the default provider, and apply variable substitution.
+   */
+  def get(key: String): Option[String] = conf.get(key).map(substitute)
+
+  /**
+   * Perform variable substitution on the given input string.
+   */
+  def substitute(input: String): String = substitute(input, Set())
+
+  private def substitute(input: String, usedRefs: Set[String]): String = {
+    if (input != null) {
+      ConfigReader.REF_RE.replaceAllIn(input, { m =>
+        val prefix = m.group(1)
+        val name = m.group(2)
+        val ref = if (prefix == null) name else s"$prefix:$name"
+        require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")
+
+        val replacement = bindings.get(prefix)
+          .flatMap(_.get(name))
+          .map { v => substitute(v, usedRefs + ref) }
+          .getOrElse(m.matched)
+        Regex.quoteReplacement(replacement)
+      })
+    } else {
+      input
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index ebdb69f..91a96bd 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.util.SparkConfWithEnv
 
 class ConfigEntrySuite extends SparkFunSuite {
 
@@ -161,25 +162,9 @@ class ConfigEntrySuite extends SparkFunSuite {
     assert(conf.get(stringConf) === null)
   }
 
-  test("variable expansion") {
+  test("variable expansion of spark config entries") {
     val env = Map("ENV1" -> "env1")
-    val conf = HashMap("spark.value1" -> "value1", "spark.value2" -> "value2")
-
-    def getenv(key: String): String = env.getOrElse(key, null)
-
-    def expand(value: String): String = ConfigEntry.expand(value, conf.asJava, getenv, Set())
-
-    assert(expand("${spark.value1}") === "value1")
-    assert(expand("spark.value1 is: ${spark.value1}") === "spark.value1 is: value1")
-    assert(expand("${spark.value1} ${spark.value2}") === "value1 value2")
-    assert(expand("${spark.value3}") === "${spark.value3}")
-
-    // Make sure anything that is not in the "spark." namespace is ignored.
-    conf("notspark.key") = "value"
-    assert(expand("${notspark.key}") === "${notspark.key}")
-
-    assert(expand("${env:ENV1}") === "env1")
-    assert(expand("${system:user.name}") === sys.props("user.name"))
+    val conf = new SparkConfWithEnv(env)
 
     val stringConf = ConfigBuilder(testKey("stringForExpansion"))
       .stringConf
@@ -193,45 +178,44 @@ class ConfigEntrySuite extends SparkFunSuite {
     val fallbackConf = ConfigBuilder(testKey("fallbackForExpansion"))
       .fallbackConf(intConf)
 
-    assert(expand("${" + stringConf.key + "}") === "string1")
-    assert(expand("${" + optionalConf.key + "}") === "${" + optionalConf.key + "}")
-    assert(expand("${" + intConf.key + "}") === "42")
-    assert(expand("${" + fallbackConf.key + "}") === "42")
-
-    conf(optionalConf.key) = "string2"
-    assert(expand("${" + optionalConf.key + "}") === "string2")
+    val refConf = ConfigBuilder(testKey("configReferenceTest"))
+      .stringConf
+      .createWithDefault(null)
 
-    conf(fallbackConf.key) = "84"
-    assert(expand("${" + fallbackConf.key + "}") === "84")
+    def ref(entry: ConfigEntry[_]): String = "${" + entry.key + "}"
 
-    assert(expand("${spark.value1") === "${spark.value1")
+    def testEntryRef(entry: ConfigEntry[_], expected: String): Unit = {
+      conf.set(refConf, ref(entry))
+      assert(conf.get(refConf) === expected)
+    }
 
-    // Unknown prefixes.
-    assert(expand("${unknown:value}") === "${unknown:value}")
+    testEntryRef(stringConf, "string1")
+    testEntryRef(intConf, "42")
+    testEntryRef(fallbackConf, "42")
 
-    // Chained references.
-    val conf1 = ConfigBuilder(testKey("conf1"))
-      .stringConf
-      .createWithDefault("value1")
-    val conf2 = ConfigBuilder(testKey("conf2"))
-      .stringConf
-      .createWithDefault("value2")
+    testEntryRef(optionalConf, ref(optionalConf))
 
-    conf(conf2.key) = "${" + conf1.key + "}"
-    assert(expand("${" + conf2.key + "}") === conf1.defaultValueString)
+    conf.set(optionalConf, ref(stringConf))
+    testEntryRef(optionalConf, "string1")
 
-    // Circular references.
-    conf(conf1.key) = "${" + conf2.key + "}"
-    val e = intercept[IllegalArgumentException] {
-      expand("${" + conf2.key + "}")
-    }
-    assert(e.getMessage().contains("Circular"))
+    conf.set(optionalConf, ref(fallbackConf))
+    testEntryRef(optionalConf, "42")
 
     // Default string values with variable references.
     val parameterizedStringConf = ConfigBuilder(testKey("stringWithParams"))
       .stringConf
-      .createWithDefault("${spark.value1}")
-    assert(parameterizedStringConf.readFrom(conf.asJava, getenv) === conf("spark.value1"))
+      .createWithDefault(ref(stringConf))
+    assert(conf.get(parameterizedStringConf) === conf.get(stringConf))
+
+    // Make sure SparkConf's env override works.
+    conf.set(refConf, "${env:ENV1}")
+    assert(conf.get(refConf) === env("ENV1"))
+
+    // Conf with null default value is not expanded.
+    val nullConf = ConfigBuilder(testKey("nullString"))
+      .stringConf
+      .createWithDefault(null)
+    testEntryRef(nullConf, ref(nullConf))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala
new file mode 100644
index 0000000..be57cc3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigReaderSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+
+class ConfigReaderSuite extends SparkFunSuite {
+
+  test("variable expansion") {
+    val env = Map("ENV1" -> "env1")
+    val conf = Map("key1" -> "value1", "key2" -> "value2")
+
+    val reader = new ConfigReader(conf.asJava)
+    reader.bindEnv(new MapProvider(env.asJava))
+
+    assert(reader.substitute(null) === null)
+    assert(reader.substitute("${key1}") === "value1")
+    assert(reader.substitute("key1 is: ${key1}") === "key1 is: value1")
+    assert(reader.substitute("${key1} ${key2}") === "value1 value2")
+    assert(reader.substitute("${key3}") === "${key3}")
+    assert(reader.substitute("${env:ENV1}") === "env1")
+    assert(reader.substitute("${system:user.name}") === sys.props("user.name"))
+    assert(reader.substitute("${key1") === "${key1")
+
+    // Unknown prefixes.
+    assert(reader.substitute("${unknown:value}") === "${unknown:value}")
+  }
+
+  test("circular references") {
+    val conf = Map("key1" -> "${key2}", "key2" -> "${key1}")
+    val reader = new ConfigReader(conf.asJava)
+    val e = intercept[IllegalArgumentException] {
+      reader.substitute("${key1}")
+    }
+    assert(e.getMessage().contains("Circular"))
+  }
+
+  test("spark conf provider filters config keys") {
+    val conf = Map("nonspark.key" -> "value", "spark.key" -> "value")
+    val reader = new ConfigReader(new SparkConfigProvider(conf.asJava))
+    assert(reader.get("nonspark.key") === None)
+    assert(reader.get("spark.key") === Some("value"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b867a65..f2b1afd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -496,7 +496,8 @@ object SQLConf {
 
   val VARIABLE_SUBSTITUTE_DEPTH =
     SQLConfigBuilder("spark.sql.variable.substitute.depth")
-      .doc("The maximum replacements the substitution engine will do.")
+      .internal()
+      .doc("Deprecated: The maximum replacements the substitution engine will do.")
       .intConf
       .createWithDefault(40)
 
@@ -565,6 +566,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
   @transient protected[spark] val settings = java.util.Collections.synchronizedMap(
     new java.util.HashMap[String, String]())
 
+  @transient private val reader = new ConfigReader(settings)
+
   /** ************************ Spark SQL Params/Hints ******************* */
 
   def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
@@ -739,7 +742,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
    */
   def getConf[T](entry: ConfigEntry[T]): T = {
     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
-    entry.readFrom(settings, System.getenv)
+    entry.readFrom(reader)
   }
 
   /**
@@ -748,7 +751,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
    */
   def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
-    entry.readFrom(settings, System.getenv)
+    entry.readFrom(reader)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
index 0982f1d..50725a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
 
 import java.util.regex.Pattern
 
+import org.apache.spark.internal.config._
 import org.apache.spark.sql.AnalysisException
 
 /**
@@ -29,93 +30,24 @@ import org.apache.spark.sql.AnalysisException
  */
 class VariableSubstitution(conf: SQLConf) {
 
-  private val pattern = Pattern.compile("\\$\\{[^\\}\\$ ]+\\}")
+  private val provider = new ConfigProvider {
+    override def get(key: String): Option[String] = Option(conf.getConfString(key, ""))
+  }
+
+  private val reader = new ConfigReader(provider)
+    .bind("spark", provider)
+    .bind("sparkconf", provider)
+    .bind("hiveconf", provider)
 
   /**
    * Given a query, does variable substitution and return the result.
    */
   def substitute(input: String): String = {
-    // Note that this function is mostly copied from Hive's SystemVariables, so the style is
-    // very Java/Hive like.
-    if (input eq null) {
-      return null
-    }
-
-    if (!conf.variableSubstituteEnabled) {
-      return input
-    }
-
-    var eval = input
-    val depth = conf.variableSubstituteDepth
-    val builder = new StringBuilder
-    val m = pattern.matcher("")
-
-    var s = 0
-    while (s <= depth) {
-      m.reset(eval)
-      builder.setLength(0)
-
-      var prev = 0
-      var found = false
-      while (m.find(prev)) {
-        val group = m.group()
-        var substitute = substituteVariable(group.substring(2, group.length - 1))
-        if (substitute.isEmpty) {
-          substitute = group
-        } else {
-          found = true
-        }
-        builder.append(eval.substring(prev, m.start())).append(substitute)
-        prev = m.end()
-      }
-
-      if (!found) {
-        return eval
-      }
-
-      builder.append(eval.substring(prev))
-      eval = builder.toString
-      s += 1
-    }
-
-    if (s > depth) {
-      throw new AnalysisException(
-        "Variable substitution depth is deeper than " + depth + " for input " + input)
+    if (conf.variableSubstituteEnabled) {
+      reader.substitute(input)
     } else {
-      return eval
+      input
     }
   }
 
-  /**
-   * Given a variable, replaces with the substitute value (default to "").
-   */
-  private def substituteVariable(variable: String): String = {
-    var value: String = null
-
-    if (variable.startsWith("system:")) {
-      value = System.getProperty(variable.substring("system:".length()))
-    }
-
-    if (value == null && variable.startsWith("env:")) {
-      value = System.getenv(variable.substring("env:".length()))
-    }
-
-    if (value == null && conf != null && variable.startsWith("hiveconf:")) {
-      value = conf.getConfString(variable.substring("hiveconf:".length()), "")
-    }
-
-    if (value == null && conf != null && variable.startsWith("sparkconf:")) {
-      value = conf.getConfString(variable.substring("sparkconf:".length()), "")
-    }
-
-    if (value == null && conf != null && variable.startsWith("spark:")) {
-      value = conf.getConfString(variable.substring("spark:".length()), "")
-    }
-
-    if (value == null && conf != null) {
-      value = conf.getConfString(variable, "")
-    }
-
-    value
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5da6c4b2/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
index deac959..d5a946a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/VariableSubstitutionSuite.scala
@@ -57,22 +57,4 @@ class VariableSubstitutionSuite extends SparkFunSuite {
     assert(sub.substitute(q) == "select 1 1 this is great")
   }
 
-  test("depth limit") {
-    val q = "select ${bar} ${foo} ${doo}"
-    conf.setConfString(SQLConf.VARIABLE_SUBSTITUTE_DEPTH.key, "2")
-
-    // This should be OK since it is not nested.
-    conf.setConfString("bar", "1")
-    conf.setConfString("foo", "2")
-    conf.setConfString("doo", "3")
-    assert(sub.substitute(q) == "select 1 2 3")
-
-    // This should not be OK since it is nested in 3 levels.
-    conf.setConfString("bar", "1")
-    conf.setConfString("foo", "${bar}")
-    conf.setConfString("doo", "${foo}")
-    intercept[AnalysisException] {
-      sub.substitute(q)
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org