You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/07 06:08:40 UTC
spark git commit: [SPARK-7169] [CORE] Allow metrics system to be
configured through SparkConf.
Repository: spark
Updated Branches:
refs/heads/master 5aa804f3c -> 18c4fcebb
[SPARK-7169] [CORE] Allow metrics system to be configured through SparkConf.
Author: Marcelo Vanzin <va...@cloudera.com>
Author: Jacek Lewandowski <le...@gmail.com>
Closes #6560 from vanzin/SPARK-7169 and squashes the following commits:
737266f [Marcelo Vanzin] Feedback.
702d5a3 [Marcelo Vanzin] Scalastyle.
ce66e7e [Marcelo Vanzin] Remove metrics config handling from SparkConf.
439938a [Jacek Lewandowski] SPARK-7169: Metrics can be additionally configured from Spark configuration
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18c4fceb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18c4fceb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18c4fceb
Branch: refs/heads/master
Commit: 18c4fcebbeecc3b26476a728bc9db62f5c0a6f87
Parents: 5aa804f
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Sat Jun 6 21:08:36 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Sat Jun 6 21:08:36 2015 -0700
----------------------------------------------------------------------
.../apache/spark/metrics/MetricsConfig.scala | 55 ++++++++-----
.../apache/spark/metrics/MetricsSystem.scala | 3 +-
.../spark/metrics/MetricsConfigSuite.scala | 82 +++++++++++++++++++-
3 files changed, 115 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/18c4fceb/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 8edf493..d749555 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -23,10 +23,10 @@ import java.util.Properties
import scala.collection.mutable
import scala.util.matching.Regex
-import org.apache.spark.Logging
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf}
-private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_PREFIX = "*"
private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
@@ -46,23 +46,14 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
// Add default properties in case there's no properties file
setDefaultProperties(properties)
- // If spark.metrics.conf is not set, try to get file in class path
- val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse {
- try {
- Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
- } catch {
- case e: Exception =>
- logError("Error loading default configuration file", e)
- None
- }
- }
+ loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
- isOpt.foreach { is =>
- try {
- properties.load(is)
- } finally {
- is.close()
- }
+ // Also look for the properties in provided Spark configuration
+ val prefix = "spark.metrics.conf."
+ conf.getAll.foreach {
+ case (k, v) if k.startsWith(prefix) =>
+ properties.setProperty(k.substring(prefix.length()), v)
+ case _ =>
}
propertyCategories = subProperties(properties, INSTANCE_REGEX)
@@ -97,5 +88,31 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
-}
+ /**
+ * Loads configuration from a config file. If no config file is provided, try to get file
+ * in class path.
+ */
+ private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
+ var is: InputStream = null
+ try {
+ is = path match {
+ case Some(f) => new FileInputStream(f)
+ case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
+ }
+
+ if (is != null) {
+ properties.load(is)
+ }
+ } catch {
+ case e: Exception =>
+ val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
+ logError(s"Error loading configuration file $file", e)
+ } finally {
+ if (is != null) {
+ is.close()
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/18c4fceb/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9150ad3..ed5131c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -70,8 +70,7 @@ private[spark] class MetricsSystem private (
securityMgr: SecurityManager)
extends Logging {
- private[this] val confFile = conf.get("spark.metrics.conf", null)
- private[this] val metricsConfig = new MetricsConfig(Option(confFile))
+ private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
http://git-wip-us.apache.org/repos/asf/spark/blob/18c4fceb/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index a901a06..41f2ff7 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.metrics
+import org.apache.spark.SparkConf
+
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
@@ -29,7 +31,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
}
test("MetricsConfig with default properties") {
- val conf = new MetricsConfig(None)
+ val sparkConf = new SparkConf(loadDefaults = false)
+ sparkConf.set("spark.metrics.conf", "dummy-file")
+ val conf = new MetricsConfig(sparkConf)
conf.initialize()
assert(conf.properties.size() === 4)
@@ -42,8 +46,41 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
- test("MetricsConfig with properties set") {
- val conf = new MetricsConfig(Option(filePath))
+ test("MetricsConfig with properties set from a file") {
+ val sparkConf = new SparkConf(loadDefaults = false)
+ sparkConf.set("spark.metrics.conf", filePath)
+ val conf = new MetricsConfig(sparkConf)
+ conf.initialize()
+
+ val masterProp = conf.getInstance("master")
+ assert(masterProp.size() === 5)
+ assert(masterProp.getProperty("sink.console.period") === "20")
+ assert(masterProp.getProperty("sink.console.unit") === "minutes")
+ assert(masterProp.getProperty("source.jvm.class") ===
+ "org.apache.spark.metrics.source.JvmSource")
+ assert(masterProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
+
+ val workerProp = conf.getInstance("worker")
+ assert(workerProp.size() === 5)
+ assert(workerProp.getProperty("sink.console.period") === "10")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(workerProp.getProperty("source.jvm.class") ===
+ "org.apache.spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
+ }
+
+ test("MetricsConfig with properties set from a Spark configuration") {
+ val sparkConf = new SparkConf(loadDefaults = false)
+ setMetricsProperty(sparkConf, "*.sink.console.period", "10")
+ setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
+ setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
+ setMetricsProperty(sparkConf, "master.sink.console.period", "20")
+ setMetricsProperty(sparkConf, "master.sink.console.unit", "minutes")
+ val conf = new MetricsConfig(sparkConf)
conf.initialize()
val masterProp = conf.getInstance("master")
@@ -67,8 +104,40 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
+ test("MetricsConfig with properties set from a file and a Spark configuration") {
+ val sparkConf = new SparkConf(loadDefaults = false)
+ setMetricsProperty(sparkConf, "*.sink.console.period", "10")
+ setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
+ setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.SomeOtherSource")
+ setMetricsProperty(sparkConf, "master.sink.console.period", "50")
+ setMetricsProperty(sparkConf, "master.sink.console.unit", "seconds")
+ sparkConf.set("spark.metrics.conf", filePath)
+ val conf = new MetricsConfig(sparkConf)
+ conf.initialize()
+
+ val masterProp = conf.getInstance("master")
+ assert(masterProp.size() === 5)
+ assert(masterProp.getProperty("sink.console.period") === "50")
+ assert(masterProp.getProperty("sink.console.unit") === "seconds")
+ assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
+ assert(masterProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
+
+ val workerProp = conf.getInstance("worker")
+ assert(workerProp.size() === 5)
+ assert(workerProp.getProperty("sink.console.period") === "10")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
+ assert(workerProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
+ }
+
test("MetricsConfig with subProperties") {
- val conf = new MetricsConfig(Option(filePath))
+ val sparkConf = new SparkConf(loadDefaults = false)
+ sparkConf.set("spark.metrics.conf", filePath)
+ val conf = new MetricsConfig(sparkConf)
conf.initialize()
val propCategories = conf.propertyCategories
@@ -90,4 +159,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
val servletProps = sinkProps("servlet")
assert(servletProps.size() === 2)
}
+
+ private def setMetricsProperty(conf: SparkConf, name: String, value: String): Unit = {
+ conf.set(s"spark.metrics.conf.$name", value)
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org