You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/07 06:08:40 UTC

spark git commit: [SPARK-7169] [CORE] Allow metrics system to be configured through SparkConf.

Repository: spark
Updated Branches:
  refs/heads/master 5aa804f3c -> 18c4fcebb


[SPARK-7169] [CORE] Allow metrics system to be configured through SparkConf.

Author: Marcelo Vanzin <va...@cloudera.com>
Author: Jacek Lewandowski <le...@gmail.com>

Closes #6560 from vanzin/SPARK-7169 and squashes the following commits:

737266f [Marcelo Vanzin] Feedback.
702d5a3 [Marcelo Vanzin] Scalastyle.
ce66e7e [Marcelo Vanzin] Remove metrics config handling from SparkConf.
439938a [Jacek Lewandowski] SPARK-7169: Metrics can be additionally configured from Spark configuration


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18c4fceb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18c4fceb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18c4fceb

Branch: refs/heads/master
Commit: 18c4fcebbeecc3b26476a728bc9db62f5c0a6f87
Parents: 5aa804f
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Sat Jun 6 21:08:36 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Sat Jun 6 21:08:36 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/metrics/MetricsConfig.scala    | 55 ++++++++-----
 .../apache/spark/metrics/MetricsSystem.scala    |  3 +-
 .../spark/metrics/MetricsConfigSuite.scala      | 82 +++++++++++++++++++-
 3 files changed, 115 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/18c4fceb/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 8edf493..d749555 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -23,10 +23,10 @@ import java.util.Properties
 import scala.collection.mutable
 import scala.util.matching.Regex
 
-import org.apache.spark.Logging
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf}
 
-private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
 
   private val DEFAULT_PREFIX = "*"
   private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
@@ -46,23 +46,14 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
     // Add default properties in case there's no properties file
     setDefaultProperties(properties)
 
-    // If spark.metrics.conf is not set, try to get file in class path
-    val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse {
-      try {
-        Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
-      } catch {
-        case e: Exception =>
-          logError("Error loading default configuration file", e)
-          None
-      }
-    }
+    loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
 
-    isOpt.foreach { is =>
-      try {
-        properties.load(is)
-      } finally {
-        is.close()
-      }
+    // Also look for the properties in provided Spark configuration
+    val prefix = "spark.metrics.conf."
+    conf.getAll.foreach {
+      case (k, v) if k.startsWith(prefix) =>
+        properties.setProperty(k.substring(prefix.length()), v)
+      case _ =>
     }
 
     propertyCategories = subProperties(properties, INSTANCE_REGEX)
@@ -97,5 +88,31 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
       case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
     }
   }
-}
 
+  /**
+   * Loads configuration from a config file. If no config file is provided, try to get file
+   * in class path.
+   */
+  private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
+    var is: InputStream = null
+    try {
+      is = path match {
+        case Some(f) => new FileInputStream(f)
+        case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
+      }
+
+      if (is != null) {
+        properties.load(is)
+      }
+    } catch {
+      case e: Exception =>
+        val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
+        logError(s"Error loading configuration file $file", e)
+    } finally {
+      if (is != null) {
+        is.close()
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/18c4fceb/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9150ad3..ed5131c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -70,8 +70,7 @@ private[spark] class MetricsSystem private (
     securityMgr: SecurityManager)
   extends Logging {
 
-  private[this] val confFile = conf.get("spark.metrics.conf", null)
-  private[this] val metricsConfig = new MetricsConfig(Option(confFile))
+  private[this] val metricsConfig = new MetricsConfig(conf)
 
   private val sinks = new mutable.ArrayBuffer[Sink]
   private val sources = new mutable.ArrayBuffer[Source]

http://git-wip-us.apache.org/repos/asf/spark/blob/18c4fceb/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index a901a06..41f2ff7 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.metrics
 
+import org.apache.spark.SparkConf
+
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkFunSuite
@@ -29,7 +31,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
   }
 
   test("MetricsConfig with default properties") {
-    val conf = new MetricsConfig(None)
+    val sparkConf = new SparkConf(loadDefaults = false)
+    sparkConf.set("spark.metrics.conf", "dummy-file")
+    val conf = new MetricsConfig(sparkConf)
     conf.initialize()
 
     assert(conf.properties.size() === 4)
@@ -42,8 +46,41 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
     assert(property.getProperty("sink.servlet.path") === "/metrics/json")
   }
 
-  test("MetricsConfig with properties set") {
-    val conf = new MetricsConfig(Option(filePath))
+  test("MetricsConfig with properties set from a file") {
+    val sparkConf = new SparkConf(loadDefaults = false)
+    sparkConf.set("spark.metrics.conf", filePath)
+    val conf = new MetricsConfig(sparkConf)
+    conf.initialize()
+
+    val masterProp = conf.getInstance("master")
+    assert(masterProp.size() === 5)
+    assert(masterProp.getProperty("sink.console.period") === "20")
+    assert(masterProp.getProperty("sink.console.unit") === "minutes")
+    assert(masterProp.getProperty("source.jvm.class") ===
+      "org.apache.spark.metrics.source.JvmSource")
+    assert(masterProp.getProperty("sink.servlet.class") ===
+      "org.apache.spark.metrics.sink.MetricsServlet")
+    assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
+
+    val workerProp = conf.getInstance("worker")
+    assert(workerProp.size() === 5)
+    assert(workerProp.getProperty("sink.console.period") === "10")
+    assert(workerProp.getProperty("sink.console.unit") === "seconds")
+    assert(workerProp.getProperty("source.jvm.class") ===
+      "org.apache.spark.metrics.source.JvmSource")
+    assert(workerProp.getProperty("sink.servlet.class") ===
+      "org.apache.spark.metrics.sink.MetricsServlet")
+    assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
+  }
+
+  test("MetricsConfig with properties set from a Spark configuration") {
+    val sparkConf = new SparkConf(loadDefaults = false)
+    setMetricsProperty(sparkConf, "*.sink.console.period", "10")
+    setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
+    setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
+    setMetricsProperty(sparkConf, "master.sink.console.period", "20")
+    setMetricsProperty(sparkConf, "master.sink.console.unit", "minutes")
+    val conf = new MetricsConfig(sparkConf)
     conf.initialize()
 
     val masterProp = conf.getInstance("master")
@@ -67,8 +104,40 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
     assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
   }
 
+  test("MetricsConfig with properties set from a file and a Spark configuration") {
+    val sparkConf = new SparkConf(loadDefaults = false)
+    setMetricsProperty(sparkConf, "*.sink.console.period", "10")
+    setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
+    setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.SomeOtherSource")
+    setMetricsProperty(sparkConf, "master.sink.console.period", "50")
+    setMetricsProperty(sparkConf, "master.sink.console.unit", "seconds")
+    sparkConf.set("spark.metrics.conf", filePath)
+    val conf = new MetricsConfig(sparkConf)
+    conf.initialize()
+
+    val masterProp = conf.getInstance("master")
+    assert(masterProp.size() === 5)
+    assert(masterProp.getProperty("sink.console.period") === "50")
+    assert(masterProp.getProperty("sink.console.unit") === "seconds")
+    assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
+    assert(masterProp.getProperty("sink.servlet.class") ===
+      "org.apache.spark.metrics.sink.MetricsServlet")
+    assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
+
+    val workerProp = conf.getInstance("worker")
+    assert(workerProp.size() === 5)
+    assert(workerProp.getProperty("sink.console.period") === "10")
+    assert(workerProp.getProperty("sink.console.unit") === "seconds")
+    assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
+    assert(workerProp.getProperty("sink.servlet.class") ===
+      "org.apache.spark.metrics.sink.MetricsServlet")
+    assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
+  }
+
   test("MetricsConfig with subProperties") {
-    val conf = new MetricsConfig(Option(filePath))
+    val sparkConf = new SparkConf(loadDefaults = false)
+    sparkConf.set("spark.metrics.conf", filePath)
+    val conf = new MetricsConfig(sparkConf)
     conf.initialize()
 
     val propCategories = conf.propertyCategories
@@ -90,4 +159,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
     val servletProps = sinkProps("servlet")
     assert(servletProps.size() === 2)
   }
+
+  private def setMetricsProperty(conf: SparkConf, name: String, value: String): Unit = {
+    conf.set(s"spark.metrics.conf.$name", value)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org