You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2018/12/22 15:03:19 UTC
[spark] branch master updated: [SPARK-26285][CORE] accumulator metrics sources for LongAccumulator and Doub…
This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0a02d5c [SPARK-26285][CORE] accumulator metrics sources for LongAccumulator and Doub…
0a02d5c is described below
commit 0a02d5c36fc5035abcfb930e1a229d65c6cf683f
Author: Alessandro Bellina <ab...@yahoo-inc.com>
AuthorDate: Sat Dec 22 09:03:02 2018 -0600
[SPARK-26285][CORE] accumulator metrics sources for LongAccumulator and Doub…
…leAccumulator
## What changes were proposed in this pull request?
This PR implements metric sources for LongAccumulator and DoubleAccumulator, such that a user can register these accumulators easily and have their values be reported by the driver's metric namespace.
## How was this patch tested?
Unit tests, and manual tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes #23242 from abellina/SPARK-26285_accumulator_source.
Lead-authored-by: Alessandro Bellina <ab...@yahoo-inc.com>
Co-authored-by: Alessandro Bellina <ab...@oath.com>
Co-authored-by: Alessandro Bellina <ab...@gmail.com>
Signed-off-by: Thomas Graves <tg...@apache.org>
---
.../spark/metrics/source/AccumulatorSource.scala | 89 +++++++++++++++++++++
.../metrics/source/AccumulatorSourceSuite.scala | 91 ++++++++++++++++++++++
.../spark/examples/AccumulatorMetricsTest.scala | 77 ++++++++++++++++++
3 files changed, 257 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala b/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala
new file mode 100644
index 0000000..45a4d22
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator, LongAccumulator}
+
+/**
+ * AccumulatorSource is a Spark metric Source that reports the current value
+ * of the accumulator as a gauge.
+ *
+ * It is restricted to the LongAccumulator and the DoubleAccumulator, as those
+ * are the current built-in numerical accumulators with Spark, and excludes
+ * the CollectionAccumulator, as that is a List of values (hard to report,
+ * to a metrics system)
+ */
+private[spark] class AccumulatorSource extends Source {
+ private val registry = new MetricRegistry
+ protected def register[T](accumulators: Map[String, AccumulatorV2[_, T]]): Unit = {
+ accumulators.foreach {
+ case (name, accumulator) =>
+ val gauge = new Gauge[T] {
+ override def getValue: T = accumulator.value
+ }
+ registry.register(MetricRegistry.name(name), gauge)
+ }
+ }
+
+ override def sourceName: String = "AccumulatorSource"
+ override def metricRegistry: MetricRegistry = registry
+}
+
+@Experimental
+class LongAccumulatorSource extends AccumulatorSource
+
+@Experimental
+class DoubleAccumulatorSource extends AccumulatorSource
+
+/**
+ * :: Experimental ::
+ * Metrics source specifically for LongAccumulators. Accumulators
+ * are only valid on the driver side, so these metrics are reported
+ * only by the driver.
+ * Register LongAccumulators using:
+ * LongAccumulatorSource.register(sc, {"name" -> longAccumulator})
+ */
+@Experimental
+object LongAccumulatorSource {
+ def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Unit = {
+ val source = new LongAccumulatorSource
+ source.register(accumulators)
+ sc.env.metricsSystem.registerSource(source)
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Metrics source specifically for DoubleAccumulators. Accumulators
+ * are only valid on the driver side, so these metrics are reported
+ * only by the driver.
+ * Register DoubleAccumulators using:
+ * DoubleAccumulatorSource.register(sc, {"name" -> doubleAccumulator})
+ */
+@Experimental
+object DoubleAccumulatorSource {
+ def register(sc: SparkContext, accumulators: Map[String, DoubleAccumulator]): Unit = {
+ val source = new DoubleAccumulatorSource
+ source.register(accumulators)
+ sc.env.metricsSystem.registerSource(source)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/source/AccumulatorSourceSuite.scala b/core/src/test/scala/org/apache/spark/metrics/source/AccumulatorSourceSuite.scala
new file mode 100644
index 0000000..6a6c07c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/source/AccumulatorSourceSuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{mock, never, spy, times, verify, when}
+
+import org.apache.spark.{SparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.{DoubleAccumulator, LongAccumulator}
+
+class AccumulatorSourceSuite extends SparkFunSuite {
+ test("that that accumulators register against the metric system's register") {
+ val acc1 = new LongAccumulator()
+ val acc2 = new LongAccumulator()
+ val mockContext = mock(classOf[SparkContext])
+ val mockEnvironment = mock(classOf[SparkEnv])
+ val mockMetricSystem = mock(classOf[MetricsSystem])
+ when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
+ when(mockContext.env) thenReturn (mockEnvironment)
+ val accs = Map("my-accumulator-1" -> acc1,
+ "my-accumulator-2" -> acc2)
+ LongAccumulatorSource.register(mockContext, accs)
+ val captor = new ArgumentCaptor[AccumulatorSource]()
+ verify(mockMetricSystem, times(1)).registerSource(captor.capture())
+ val source = captor.getValue()
+ val gauges = source.metricRegistry.getGauges()
+ assert (gauges.size == 2)
+ assert (gauges.firstKey == "my-accumulator-1")
+ assert (gauges.lastKey == "my-accumulator-2")
+ }
+
+ test("the accumulators value property is checked when the gauge's value is requested") {
+ val acc1 = new LongAccumulator()
+ acc1.add(123)
+ val acc2 = new LongAccumulator()
+ acc2.add(456)
+ val mockContext = mock(classOf[SparkContext])
+ val mockEnvironment = mock(classOf[SparkEnv])
+ val mockMetricSystem = mock(classOf[MetricsSystem])
+ when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
+ when(mockContext.env) thenReturn (mockEnvironment)
+ val accs = Map("my-accumulator-1" -> acc1,
+ "my-accumulator-2" -> acc2)
+ LongAccumulatorSource.register(mockContext, accs)
+ val captor = new ArgumentCaptor[AccumulatorSource]()
+ verify(mockMetricSystem, times(1)).registerSource(captor.capture())
+ val source = captor.getValue()
+ val gauges = source.metricRegistry.getGauges()
+ assert(gauges.get("my-accumulator-1").getValue() == 123)
+ assert(gauges.get("my-accumulator-2").getValue() == 456)
+ }
+
+ test("the double accumulators value propety is checked when the gauge's value is requested") {
+ val acc1 = new DoubleAccumulator()
+ acc1.add(123.123)
+ val acc2 = new DoubleAccumulator()
+ acc2.add(456.456)
+ val mockContext = mock(classOf[SparkContext])
+ val mockEnvironment = mock(classOf[SparkEnv])
+ val mockMetricSystem = mock(classOf[MetricsSystem])
+ when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
+ when(mockContext.env) thenReturn (mockEnvironment)
+ val accs = Map(
+ "my-accumulator-1" -> acc1,
+ "my-accumulator-2" -> acc2)
+ DoubleAccumulatorSource.register(mockContext, accs)
+ val captor = new ArgumentCaptor[AccumulatorSource]()
+ verify(mockMetricSystem, times(1)).registerSource(captor.capture())
+ val source = captor.getValue()
+ val gauges = source.metricRegistry.getGauges()
+ assert(gauges.get("my-accumulator-1").getValue() == 123.123)
+ assert(gauges.get("my-accumulator-2").getValue() == 456.456)
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/AccumulatorMetricsTest.scala b/examples/src/main/scala/org/apache/spark/examples/AccumulatorMetricsTest.scala
new file mode 100644
index 0000000..5d9a9a7
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/AccumulatorMetricsTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import org.apache.spark.metrics.source.{DoubleAccumulatorSource, LongAccumulatorSource}
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Usage: AccumulatorMetricsTest [numElem]
+ *
+ * This example shows how to register accumulators against the accumulator source.
+ * A simple RDD is created, and during the map, the accumulators are incremented.
+ *
+ * The only argument, numElem, sets the number elements in the collection to parallize.
+ *
+ * The result is output to stdout in the driver with the values of the accumulators.
+ * For the long accumulator, it should equal numElem the double accumulator should be
+ * roughly 1.1 x numElem (within double precision.) This example also sets up a
+ * ConsoleSink (metrics) instance, and so registered codahale metrics (like the
+ * accumulator source) are reported to stdout as well.
+ */
+object AccumulatorMetricsTest {
+ def main(args: Array[String]) {
+
+ val spark = SparkSession
+ .builder()
+ .config("spark.metrics.conf.*.sink.console.class",
+ "org.apache.spark.metrics.sink.ConsoleSink")
+ .getOrCreate()
+
+ val sc = spark.sparkContext
+
+ val acc = sc.longAccumulator("my-long-metric")
+ // register the accumulator, the metric system will report as
+ // [spark.metrics.namespace].[execId|driver].AccumulatorSource.my-long-metric
+ LongAccumulatorSource.register(sc, List(("my-long-metric" -> acc)).toMap)
+
+ val acc2 = sc.doubleAccumulator("my-double-metric")
+ // register the accumulator, the metric system will report as
+ // [spark.metrics.namespace].[execId|driver].AccumulatorSource.my-double-metric
+ DoubleAccumulatorSource.register(sc, List(("my-double-metric" -> acc2)).toMap)
+
+ val num = if (args.length > 0) args(0).toInt else 1000000
+
+ val startTime = System.nanoTime
+
+ val accumulatorTest = sc.parallelize(1 to num).foreach(_ => {
+ acc.add(1)
+ acc2.add(1.1)
+ })
+
+ // Print a footer with test time and accumulator values
+ println("Test took %.0f milliseconds".format((System.nanoTime - startTime) / 1E6))
+ println("Accumulator values:")
+ println("*** Long accumulator (my-long-metric): " + acc.value)
+ println("*** Double accumulator (my-double-metric): " + acc2.value)
+
+ spark.stop()
+ }
+}
+// scalastyle:on println
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org