You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/10/21 02:07:19 UTC
[spark] branch branch-3.2 updated: [SPARK-37078][CORE] Support old
3-parameter Sink constructors
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 70c1d60 [SPARK-37078][CORE] Support old 3-parameter Sink constructors
70c1d60 is described below
commit 70c1d603cb4287cfac6cef0325150662214b7129
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Wed Oct 20 19:05:36 2021 -0700
[SPARK-37078][CORE] Support old 3-parameter Sink constructors
### What changes were proposed in this pull request?
This PR aims to support 3-parameter Sink constructors which have `SecurityManager` as the 3rd parameter.
### Why are the changes needed?
Apache Spark 3.2.0 cannot load old Sink libraries because SPARK-34520 removed `SecurityManager` parameter and try to create Sink class with new two-parameter constructor only.
### Does this PR introduce _any_ user-facing change?
This will recover the breaking change.
### How was this patch tested?
Pass the CIs with newly added test coverage.
Closes #34348 from dongjoon-hyun/SPARK-37078.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 427816d165c5754e0e017bec3fdc7ae34a6786c1)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/metrics/MetricsSystem.scala | 17 +++++++++++++----
.../apache/spark/metrics/MetricsSystemSuite.scala | 21 +++++++++++++++++++++
2 files changed, 34 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index abece1e..a5903de 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
import com.codahale.metrics.{Metric, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.sink.{MetricsServlet, PrometheusServlet, Sink}
@@ -211,9 +211,18 @@ private[spark] class MetricsSystem private (
.newInstance(kv._2, registry)
prometheusServlet = Some(servlet)
} else {
- val sink = Utils.classForName[Sink](classPath)
- .getConstructor(classOf[Properties], classOf[MetricRegistry])
- .newInstance(kv._2, registry)
+ val sink = try {
+ Utils.classForName[Sink](classPath)
+ .getConstructor(classOf[Properties], classOf[MetricRegistry])
+ .newInstance(kv._2, registry)
+ } catch {
+ case _: NoSuchMethodException =>
+ // Fallback to three-parameters constructor having SecurityManager
+ Utils.classForName[Sink](classPath)
+ .getConstructor(
+ classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+ .newInstance(kv._2, registry, null)
+ }
sinks += sink
}
} catch {
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 31d8492..0d4be5b 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.metrics
+import java.util.Properties
+
import scala.collection.mutable.ArrayBuffer
import com.codahale.metrics.MetricRegistry
@@ -269,4 +271,23 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
assert(metricName === source.sourceName)
}
+ test("SPARK-37078: Support old 3-parameter Sink constructors") {
+ conf.set(
+ "spark.metrics.conf.*.sink.jmx.class",
+ "org.apache.spark.metrics.ThreeParameterConstructorSink")
+ val metricsSystem = MetricsSystem.createMetricsSystem("legacy", conf)
+ metricsSystem.start()
+ val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks"))
+
+ assert(metricsSystem.invokePrivate(sinks()).length === 1)
+ }
+}
+
+class ThreeParameterConstructorSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager) extends Sink {
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def report(): Unit = {}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org