You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2018/12/22 15:03:19 UTC

[spark] branch master updated: [SPARK-26285][CORE] accumulator metrics sources for LongAccumulator and Doub…

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a02d5c  [SPARK-26285][CORE] accumulator metrics sources for LongAccumulator and Doub…
0a02d5c is described below

commit 0a02d5c36fc5035abcfb930e1a229d65c6cf683f
Author: Alessandro Bellina <ab...@yahoo-inc.com>
AuthorDate: Sat Dec 22 09:03:02 2018 -0600

    [SPARK-26285][CORE] accumulator metrics sources for LongAccumulator and Doub…
    
    …leAccumulator
    
    ## What changes were proposed in this pull request?
    
    This PR implements metric sources for LongAccumulator and DoubleAccumulator, such that a user can register these accumulators easily and have their values be reported by the driver's metric namespace.
    
    ## How was this patch tested?
    
    Unit tests, and manual tests.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #23242 from abellina/SPARK-26285_accumulator_source.
    
    Lead-authored-by: Alessandro Bellina <ab...@yahoo-inc.com>
    Co-authored-by: Alessandro Bellina <ab...@oath.com>
    Co-authored-by: Alessandro Bellina <ab...@gmail.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../spark/metrics/source/AccumulatorSource.scala   | 89 +++++++++++++++++++++
 .../metrics/source/AccumulatorSourceSuite.scala    | 91 ++++++++++++++++++++++
 .../spark/examples/AccumulatorMetricsTest.scala    | 77 ++++++++++++++++++
 3 files changed, 257 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala b/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala
new file mode 100644
index 0000000..45a4d22
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator, LongAccumulator}
+
+/**
+ * AccumulatorSource is a Spark metric Source that reports the current value
+ * of the accumulator as a gauge.
+ *
+ * It is restricted to the LongAccumulator and the DoubleAccumulator, as those
+ * are the current built-in numerical accumulators with Spark, and excludes
+ * the CollectionAccumulator, as that is a List of values (hard to report,
+ * to a metrics system)
+ */
+private[spark] class AccumulatorSource extends Source {
+  private val registry = new MetricRegistry
+  protected def register[T](accumulators: Map[String, AccumulatorV2[_, T]]): Unit = {
+    accumulators.foreach {
+      case (name, accumulator) =>
+        val gauge = new Gauge[T] {
+          override def getValue: T = accumulator.value
+        }
+        registry.register(MetricRegistry.name(name), gauge)
+    }
+  }
+
+  override def sourceName: String = "AccumulatorSource"
+  override def metricRegistry: MetricRegistry = registry
+}
+
+@Experimental
+class LongAccumulatorSource extends AccumulatorSource
+
+@Experimental
+class DoubleAccumulatorSource extends AccumulatorSource
+
+/**
+ * :: Experimental ::
+ * Metrics source specifically for LongAccumulators. Accumulators
+ * are only valid on the driver side, so these metrics are reported
+ * only by the driver.
+ * Register LongAccumulators using:
+ *    LongAccumulatorSource.register(sc, {"name" -> longAccumulator})
+ */
+@Experimental
+object LongAccumulatorSource {
+  def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Unit = {
+    val source = new LongAccumulatorSource
+    source.register(accumulators)
+    sc.env.metricsSystem.registerSource(source)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Metrics source specifically for DoubleAccumulators. Accumulators
+ * are only valid on the driver side, so these metrics are reported
+ * only by the driver.
+ * Register DoubleAccumulators using:
+ *    DoubleAccumulatorSource.register(sc, {"name" -> doubleAccumulator})
+ */
+@Experimental
+object DoubleAccumulatorSource {
+  def register(sc: SparkContext, accumulators: Map[String, DoubleAccumulator]): Unit = {
+    val source = new DoubleAccumulatorSource
+    source.register(accumulators)
+    sc.env.metricsSystem.registerSource(source)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/source/AccumulatorSourceSuite.scala b/core/src/test/scala/org/apache/spark/metrics/source/AccumulatorSourceSuite.scala
new file mode 100644
index 0000000..6a6c07c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/source/AccumulatorSourceSuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{mock, never, spy, times, verify, when}
+
+import org.apache.spark.{SparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.{DoubleAccumulator, LongAccumulator}
+
+class AccumulatorSourceSuite extends SparkFunSuite {
+  test("that that accumulators register against the metric system's register") {
+    val acc1 = new LongAccumulator()
+    val acc2 = new LongAccumulator()
+    val mockContext = mock(classOf[SparkContext])
+    val mockEnvironment = mock(classOf[SparkEnv])
+    val mockMetricSystem = mock(classOf[MetricsSystem])
+    when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
+    when(mockContext.env) thenReturn (mockEnvironment)
+    val accs = Map("my-accumulator-1" -> acc1,
+                   "my-accumulator-2" -> acc2)
+    LongAccumulatorSource.register(mockContext, accs)
+    val captor = new ArgumentCaptor[AccumulatorSource]()
+    verify(mockMetricSystem, times(1)).registerSource(captor.capture())
+    val source = captor.getValue()
+    val gauges = source.metricRegistry.getGauges()
+    assert (gauges.size == 2)
+    assert (gauges.firstKey == "my-accumulator-1")
+    assert (gauges.lastKey == "my-accumulator-2")
+  }
+
+  test("the accumulators value property is checked when the gauge's value is requested") {
+    val acc1 = new LongAccumulator()
+    acc1.add(123)
+    val acc2 = new LongAccumulator()
+    acc2.add(456)
+    val mockContext = mock(classOf[SparkContext])
+    val mockEnvironment = mock(classOf[SparkEnv])
+    val mockMetricSystem = mock(classOf[MetricsSystem])
+    when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
+    when(mockContext.env) thenReturn (mockEnvironment)
+    val accs = Map("my-accumulator-1" -> acc1,
+                   "my-accumulator-2" -> acc2)
+    LongAccumulatorSource.register(mockContext, accs)
+    val captor = new ArgumentCaptor[AccumulatorSource]()
+    verify(mockMetricSystem, times(1)).registerSource(captor.capture())
+    val source = captor.getValue()
+    val gauges = source.metricRegistry.getGauges()
+    assert(gauges.get("my-accumulator-1").getValue() == 123)
+    assert(gauges.get("my-accumulator-2").getValue() == 456)
+  }
+
+  test("the double accumulators value propety is checked when the gauge's value is requested") {
+    val acc1 = new DoubleAccumulator()
+    acc1.add(123.123)
+    val acc2 = new DoubleAccumulator()
+    acc2.add(456.456)
+    val mockContext = mock(classOf[SparkContext])
+    val mockEnvironment = mock(classOf[SparkEnv])
+    val mockMetricSystem = mock(classOf[MetricsSystem])
+    when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
+    when(mockContext.env) thenReturn (mockEnvironment)
+    val accs = Map(
+      "my-accumulator-1" -> acc1,
+      "my-accumulator-2" -> acc2)
+    DoubleAccumulatorSource.register(mockContext, accs)
+    val captor = new ArgumentCaptor[AccumulatorSource]()
+    verify(mockMetricSystem, times(1)).registerSource(captor.capture())
+    val source = captor.getValue()
+    val gauges = source.metricRegistry.getGauges()
+    assert(gauges.get("my-accumulator-1").getValue() == 123.123)
+    assert(gauges.get("my-accumulator-2").getValue() == 456.456)
+  }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/AccumulatorMetricsTest.scala b/examples/src/main/scala/org/apache/spark/examples/AccumulatorMetricsTest.scala
new file mode 100644
index 0000000..5d9a9a7
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/AccumulatorMetricsTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import org.apache.spark.metrics.source.{DoubleAccumulatorSource, LongAccumulatorSource}
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Usage: AccumulatorMetricsTest [numElem]
+ *
+ * This example shows how to register accumulators against the accumulator source.
+ * A simple RDD is created, and during the map, the accumulators are incremented.
+ *
+ * The only argument, numElem, sets the number elements in the collection to parallize.
+ *
+ * The result is output to stdout in the driver with the values of the accumulators.
+ * For the long accumulator, it should equal numElem the double accumulator should be
+ * roughly 1.1 x numElem (within double precision.) This example also sets up a
+ * ConsoleSink (metrics) instance, and so registered codahale metrics (like the
+ * accumulator source) are reported to stdout as well.
+ */
+object AccumulatorMetricsTest {
+  def main(args: Array[String]) {
+
+    val spark = SparkSession
+      .builder()
+      .config("spark.metrics.conf.*.sink.console.class",
+              "org.apache.spark.metrics.sink.ConsoleSink")
+      .getOrCreate()
+
+    val sc = spark.sparkContext
+
+    val acc = sc.longAccumulator("my-long-metric")
+    // register the accumulator, the metric system will report as
+    // [spark.metrics.namespace].[execId|driver].AccumulatorSource.my-long-metric
+    LongAccumulatorSource.register(sc, List(("my-long-metric" -> acc)).toMap)
+
+    val acc2 = sc.doubleAccumulator("my-double-metric")
+    // register the accumulator, the metric system will report as
+    // [spark.metrics.namespace].[execId|driver].AccumulatorSource.my-double-metric
+    DoubleAccumulatorSource.register(sc, List(("my-double-metric" -> acc2)).toMap)
+
+    val num = if (args.length > 0) args(0).toInt else 1000000
+
+    val startTime = System.nanoTime
+
+    val accumulatorTest = sc.parallelize(1 to num).foreach(_ => {
+      acc.add(1)
+      acc2.add(1.1)
+    })
+
+    // Print a footer with test time and accumulator values
+    println("Test took %.0f milliseconds".format((System.nanoTime - startTime) / 1E6))
+    println("Accumulator values:")
+    println("*** Long accumulator (my-long-metric): " + acc.value)
+    println("*** Double accumulator (my-double-metric): " + acc2.value)
+
+    spark.stop()
+  }
+}
+// scalastyle:on println


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org