You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2017/08/31 00:57:26 UTC

spark git commit: [SPARK-11574][CORE] Add metrics StatsD sink

Repository: spark
Updated Branches:
  refs/heads/master 313c6ca43 -> cd5d0f337


[SPARK-11574][CORE] Add metrics StatsD sink

This patch adds statsd sink to the current metrics system in spark core.

Author: Xiaofeng Lin <xl...@twilio.com>

Closes #9518 from xflin/statsd.

Change-Id: Ib8720e86223d4a650df53f51ceb963cd95b49a44


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd5d0f33
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd5d0f33
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd5d0f33

Branch: refs/heads/master
Commit: cd5d0f3379b1a9fa0940ffd98bfff33f8cbcdeb0
Parents: 313c6ca
Author: Xiaofeng Lin <xl...@twilio.com>
Authored: Thu Aug 31 08:57:15 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Aug 31 08:57:15 2017 +0800

----------------------------------------------------------------------
 conf/metrics.properties.template                |  12 ++
 .../spark/metrics/sink/StatsdReporter.scala     | 163 +++++++++++++++++++
 .../apache/spark/metrics/sink/StatsdSink.scala  |  75 +++++++++
 .../spark/metrics/sink/StatsdSinkSuite.scala    | 161 ++++++++++++++++++
 docs/monitoring.md                              |   1 +
 5 files changed, 412 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/conf/metrics.properties.template
----------------------------------------------------------------------
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index aeb76c9..4c008a1 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -118,6 +118,14 @@
 #   prefix    EMPTY STRING  Prefix to prepend to every metric's name
 #   protocol  tcp           Protocol ("tcp" or "udp") to use
 
+# org.apache.spark.metrics.sink.StatsdSink
+#   Name:     Default:      Description:
+#   host      127.0.0.1     Hostname or IP of StatsD server
+#   port      8125          Port of StatsD server
+#   period    10            Poll period
+#   unit      seconds       Units of poll period
+#   prefix    EMPTY STRING  Prefix to prepend to metric name
+
 ## Examples
 # Enable JmxSink for all instances by class name
 #*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
@@ -125,6 +133,10 @@
 # Enable ConsoleSink for all instances by class name
 #*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
 
+# Enable StatsdSink for all instances by class name
+#*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
+#*.sink.statsd.prefix=spark
+
 # Polling period for the ConsoleSink
 #*.sink.console.period=10
 # Unit of the polling period for the ConsoleSink

http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
new file mode 100644
index 0000000..ba75aa1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.io.IOException
+import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.SortedMap
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import com.codahale.metrics._
+import org.apache.hadoop.net.NetUtils
+
+import org.apache.spark.internal.Logging
+
+/**
+ * @see <a href="https://github.com/etsy/statsd/blob/master/docs/metric_types.md">
+ *        StatsD metric types</a>
+ */
+private[spark] object StatsdMetricType {
+  val COUNTER = "c"
+  val GAUGE = "g"
+  val TIMER = "ms"
+  val Set = "s"
+}
+
+private[spark] class StatsdReporter(
+    registry: MetricRegistry,
+    host: String = "127.0.0.1",
+    port: Int = 8125,
+    prefix: String = "",
+    filter: MetricFilter = MetricFilter.ALL,
+    rateUnit: TimeUnit = TimeUnit.SECONDS,
+    durationUnit: TimeUnit = TimeUnit.MILLISECONDS)
+  extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, durationUnit)
+  with Logging {
+
+  import StatsdMetricType._
+
+  private val address = new InetSocketAddress(host, port)
+  private val whitespace = "[\\s]+".r
+
+  override def report(
+      gauges: SortedMap[String, Gauge[_]],
+      counters: SortedMap[String, Counter],
+      histograms: SortedMap[String, Histogram],
+      meters: SortedMap[String, Meter],
+      timers: SortedMap[String, Timer]): Unit =
+    Try(new DatagramSocket) match {
+      case Failure(ioe: IOException) => logWarning("StatsD datagram socket construction failed",
+        NetUtils.wrapException(host, port, NetUtils.getHostname(), 0, ioe))
+      case Failure(e) => logWarning("StatsD datagram socket construction failed", e)
+      case Success(s) =>
+        implicit val socket = s
+        val localAddress = Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null)
+        val localPort = socket.getLocalPort
+        Try {
+          gauges.entrySet.asScala.foreach(e => reportGauge(e.getKey, e.getValue))
+          counters.entrySet.asScala.foreach(e => reportCounter(e.getKey, e.getValue))
+          histograms.entrySet.asScala.foreach(e => reportHistogram(e.getKey, e.getValue))
+          meters.entrySet.asScala.foreach(e => reportMetered(e.getKey, e.getValue))
+          timers.entrySet.asScala.foreach(e => reportTimer(e.getKey, e.getValue))
+        } recover {
+          case ioe: IOException =>
+            logDebug(s"Unable to send packets to StatsD", NetUtils.wrapException(
+              address.getHostString, address.getPort, localAddress, localPort, ioe))
+          case e: Throwable => logDebug(s"Unable to send packets to StatsD at '$host:$port'", e)
+        }
+        Try(socket.close()) recover {
+          case ioe: IOException =>
+            logDebug("Error when close socket to StatsD", NetUtils.wrapException(
+              address.getHostString, address.getPort, localAddress, localPort, ioe))
+          case e: Throwable => logDebug("Error when close socket to StatsD", e)
+        }
+    }
+
+  private def reportGauge(name: String, gauge: Gauge[_])(implicit socket: DatagramSocket): Unit =
+    formatAny(gauge.getValue).foreach(v => send(fullName(name), v, GAUGE))
+
+  private def reportCounter(name: String, counter: Counter)(implicit socket: DatagramSocket): Unit =
+    send(fullName(name), format(counter.getCount), COUNTER)
+
+  private def reportHistogram(name: String, histogram: Histogram)
+      (implicit socket: DatagramSocket): Unit = {
+    val snapshot = histogram.getSnapshot
+    send(fullName(name, "count"), format(histogram.getCount), GAUGE)
+    send(fullName(name, "max"), format(snapshot.getMax), TIMER)
+    send(fullName(name, "mean"), format(snapshot.getMean), TIMER)
+    send(fullName(name, "min"), format(snapshot.getMin), TIMER)
+    send(fullName(name, "stddev"), format(snapshot.getStdDev), TIMER)
+    send(fullName(name, "p50"), format(snapshot.getMedian), TIMER)
+    send(fullName(name, "p75"), format(snapshot.get75thPercentile), TIMER)
+    send(fullName(name, "p95"), format(snapshot.get95thPercentile), TIMER)
+    send(fullName(name, "p98"), format(snapshot.get98thPercentile), TIMER)
+    send(fullName(name, "p99"), format(snapshot.get99thPercentile), TIMER)
+    send(fullName(name, "p999"), format(snapshot.get999thPercentile), TIMER)
+  }
+
+  private def reportMetered(name: String, meter: Metered)(implicit socket: DatagramSocket): Unit = {
+    send(fullName(name, "count"), format(meter.getCount), GAUGE)
+    send(fullName(name, "m1_rate"), format(convertRate(meter.getOneMinuteRate)), TIMER)
+    send(fullName(name, "m5_rate"), format(convertRate(meter.getFiveMinuteRate)), TIMER)
+    send(fullName(name, "m15_rate"), format(convertRate(meter.getFifteenMinuteRate)), TIMER)
+    send(fullName(name, "mean_rate"), format(convertRate(meter.getMeanRate)), TIMER)
+  }
+
+  private def reportTimer(name: String, timer: Timer)(implicit socket: DatagramSocket): Unit = {
+    val snapshot = timer.getSnapshot
+    send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), TIMER)
+    send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), TIMER)
+    send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), TIMER)
+    send(fullName(name, "stddev"), format(convertDuration(snapshot.getStdDev)), TIMER)
+    send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), TIMER)
+    send(fullName(name, "p75"), format(convertDuration(snapshot.get75thPercentile)), TIMER)
+    send(fullName(name, "p95"), format(convertDuration(snapshot.get95thPercentile)), TIMER)
+    send(fullName(name, "p98"), format(convertDuration(snapshot.get98thPercentile)), TIMER)
+    send(fullName(name, "p99"), format(convertDuration(snapshot.get99thPercentile)), TIMER)
+    send(fullName(name, "p999"), format(convertDuration(snapshot.get999thPercentile)), TIMER)
+
+    reportMetered(name, timer)
+  }
+
+  private def send(name: String, value: String, metricType: String)
+      (implicit socket: DatagramSocket): Unit = {
+    val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
+    val packet = new DatagramPacket(bytes, bytes.length, address)
+    socket.send(packet)
+  }
+
+  private def fullName(names: String*): String = MetricRegistry.name(prefix, names : _*)
+
+  private def sanitize(s: String): String = whitespace.replaceAllIn(s, "-")
+
+  private def format(v: Any): String = formatAny(v).getOrElse("")
+
+  private def formatAny(v: Any): Option[String] =
+    v match {
+      case f: Float => Some("%2.2f".format(f))
+      case d: Double => Some("%2.2f".format(d))
+      case b: BigDecimal => Some("%2.2f".format(b))
+      case n: Number => Some(v.toString)
+      case _ => None
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
new file mode 100644
index 0000000..859a2f6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.MetricsSystem
+
+private[spark] object StatsdSink {
+  val STATSD_KEY_HOST = "host"
+  val STATSD_KEY_PORT = "port"
+  val STATSD_KEY_PERIOD = "period"
+  val STATSD_KEY_UNIT = "unit"
+  val STATSD_KEY_PREFIX = "prefix"
+
+  val STATSD_DEFAULT_HOST = "127.0.0.1"
+  val STATSD_DEFAULT_PORT = "8125"
+  val STATSD_DEFAULT_PERIOD = "10"
+  val STATSD_DEFAULT_UNIT = "SECONDS"
+  val STATSD_DEFAULT_PREFIX = ""
+}
+
+private[spark] class StatsdSink(
+    val property: Properties,
+    val registry: MetricRegistry,
+    securityMgr: SecurityManager)
+  extends Sink with Logging {
+  import StatsdSink._
+
+  val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST)
+  val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt
+
+  val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt
+  val pollUnit =
+    TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase)
+
+  val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX)
+
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  val reporter = new StatsdReporter(registry, host, port, prefix)
+
+  override def start(): Unit = {
+    reporter.start(pollPeriod, pollUnit)
+    logInfo(s"StatsdSink started with prefix: '$prefix'")
+  }
+
+  override def stop(): Unit = {
+    reporter.stop()
+    logInfo("StatsdSink stopped.")
+  }
+
+  override def report(): Unit = reporter.report()
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
new file mode 100644
index 0000000..0e21a36
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.{DatagramPacket, DatagramSocket}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Properties
+import java.util.concurrent.TimeUnit._
+
+import com.codahale.metrics._
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.metrics.sink.StatsdSink._
+
+class StatsdSinkSuite extends SparkFunSuite {
+  private val securityMgr = new SecurityManager(new SparkConf(false))
+  private val defaultProps = Map(
+    STATSD_KEY_PREFIX -> "spark",
+    STATSD_KEY_PERIOD -> "1",
+    STATSD_KEY_UNIT -> "seconds",
+    STATSD_KEY_HOST -> "127.0.0.1"
+  )
+  private val socketTimeout = 30000 // milliseconds
+  private val socketBufferSize = 8192
+
+  private def withSocketAndSink(testCode: (DatagramSocket, StatsdSink) => Any): Unit = {
+    val socket = new DatagramSocket
+    socket.setReceiveBufferSize(socketBufferSize)
+    socket.setSoTimeout(socketTimeout)
+    val props = new Properties
+    defaultProps.foreach(e => props.put(e._1, e._2))
+    props.put(STATSD_KEY_PORT, socket.getLocalPort.toString)
+    val registry = new MetricRegistry
+    val sink = new StatsdSink(props, registry, securityMgr)
+    try {
+      testCode(socket, sink)
+    } finally {
+      socket.close()
+    }
+  }
+
+  test("metrics StatsD sink with Counter") {
+    withSocketAndSink { (socket, sink) =>
+      val counter = new Counter
+      counter.inc(12)
+      sink.registry.register("counter", counter)
+      sink.report()
+
+      val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+      socket.receive(p)
+
+      val result = new String(p.getData, 0, p.getLength, UTF_8)
+      assert(result === "spark.counter:12|c", "Counter metric received should match data sent")
+    }
+  }
+
+  test("metrics StatsD sink with Gauge") {
+    withSocketAndSink { (socket, sink) =>
+      val gauge = new Gauge[Double] {
+        override def getValue: Double = 1.23
+      }
+      sink.registry.register("gauge", gauge)
+      sink.report()
+
+      val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+      socket.receive(p)
+
+      val result = new String(p.getData, 0, p.getLength, UTF_8)
+      assert(result === "spark.gauge:1.23|g", "Gauge metric received should match data sent")
+    }
+  }
+
+  test("metrics StatsD sink with Histogram") {
+    withSocketAndSink { (socket, sink) =>
+      val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+      val histogram = new Histogram(new UniformReservoir)
+      histogram.update(10)
+      histogram.update(20)
+      histogram.update(30)
+      sink.registry.register("histogram", histogram)
+      sink.report()
+
+      val expectedResults = Set(
+        "spark.histogram.count:3|g",
+        "spark.histogram.max:30|ms",
+        "spark.histogram.mean:20.00|ms",
+        "spark.histogram.min:10|ms",
+        "spark.histogram.stddev:10.00|ms",
+        "spark.histogram.p50:20.00|ms",
+        "spark.histogram.p75:30.00|ms",
+        "spark.histogram.p95:30.00|ms",
+        "spark.histogram.p98:30.00|ms",
+        "spark.histogram.p99:30.00|ms",
+        "spark.histogram.p999:30.00|ms"
+      )
+
+      (1 to expectedResults.size).foreach { i =>
+        socket.receive(p)
+        val result = new String(p.getData, 0, p.getLength, UTF_8)
+        logInfo(s"Received histogram result $i: '$result'")
+        assert(expectedResults.contains(result),
+          "Histogram metric received should match data sent")
+      }
+    }
+  }
+
+  test("metrics StatsD sink with Timer") {
+    withSocketAndSink { (socket, sink) =>
+      val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+      val timer = new Timer()
+      timer.update(1, SECONDS)
+      timer.update(2, SECONDS)
+      timer.update(3, SECONDS)
+      sink.registry.register("timer", timer)
+      sink.report()
+
+      val expectedResults = Set(
+        "spark.timer.max:3000.00|ms",
+        "spark.timer.mean:2000.00|ms",
+        "spark.timer.min:1000.00|ms",
+        "spark.timer.stddev:816.50|ms",
+        "spark.timer.p50:2000.00|ms",
+        "spark.timer.p75:3000.00|ms",
+        "spark.timer.p95:3000.00|ms",
+        "spark.timer.p98:3000.00|ms",
+        "spark.timer.p99:3000.00|ms",
+        "spark.timer.p999:3000.00|ms",
+        "spark.timer.count:3|g",
+        "spark.timer.m1_rate:0.00|ms",
+        "spark.timer.m5_rate:0.00|ms",
+        "spark.timer.m15_rate:0.00|ms"
+      )
+      // mean rate varies on each test run
+      val oneMoreResult = """spark.timer.mean_rate:\d+\.\d\d\|ms"""
+
+      (1 to (expectedResults.size + 1)).foreach { i =>
+        socket.receive(p)
+        val result = new String(p.getData, 0, p.getLength, UTF_8)
+        logInfo(s"Received timer result $i: '$result'")
+        assert(expectedResults.contains(result) || result.matches(oneMoreResult),
+          "Timer metric received should match data sent")
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index d22cd94..51084a2 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -455,6 +455,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the
 * `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
 * `GraphiteSink`: Sends metrics to a Graphite node.
 * `Slf4jSink`: Sends metrics to slf4j as log entries.
+* `StatsdSink`: Sends metrics to a StatsD node.
 
 Spark also supports a Ganglia sink which is not included in the default build due to
 licensing restrictions:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org