You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/01 17:49:59 UTC

[spark] branch master updated: [SPARK-26967][CORE] Put MetricsSystem instance names together for clearer management

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b25c4  [SPARK-26967][CORE] Put MetricsSystem instance names together for clearer management
86b25c4 is described below

commit 86b25c43506a0a858601bd1082e9f96bb0415eb8
Author: SongYadong <so...@zte.com.cn>
AuthorDate: Fri Mar 1 11:49:43 2019 -0600

    [SPARK-26967][CORE] Put MetricsSystem instance names together for clearer management
    
    ## What changes were proposed in this pull request?
    
    `MetricsSystem` instance creations have a scattered distribution in the project code. So do their names. It may cause some inconvenience for browsing and management.
    This PR tries to put them together. In this way, we can have a uniform location for adding or removing them, and have a overall view of `MetircsSystem `instances in current project.
    It's also helpful for maintaining user documents by avoiding missing something.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #23869 from SongYadong/metrics_system_inst_manage.
    
    Lead-authored-by: SongYadong <so...@zte.com.cn>
    Co-authored-by: walter2001 <yd...@163.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../src/main/scala/org/apache/spark/SparkEnv.scala |  7 +++---
 .../spark/deploy/ExternalShuffleService.scala      |  5 +++--
 .../org/apache/spark/deploy/master/Master.scala    |  9 ++++----
 .../org/apache/spark/deploy/worker/Worker.scala    |  5 +++--
 .../org/apache/spark/metrics/MetricsSystem.scala   | 26 ++++++++++++++++++++++
 .../apache/spark/metrics/MetricsSystemSuite.scala  | 18 +++++++--------
 .../cluster/mesos/MesosClusterScheduler.scala      |  5 +++--
 .../spark/deploy/yarn/ApplicationMaster.scala      |  5 +++--
 8 files changed, 56 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 799e7e4..0ac6f08 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -34,7 +34,7 @@ import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
 import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
@@ -346,13 +346,14 @@ object SparkEnv extends Logging {
       // Don't start metrics system right now for Driver.
       // We need to wait for the task scheduler to give us an app ID.
       // Then we can start the metrics system.
-      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
+      MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)
     } else {
       // We need to set the executor ID before the MetricsSystem is created because sources and
       // sinks specified in the metrics configuration file will want to incorporate this executor's
       // ID into the metrics they report.
       conf.set(EXECUTOR_ID, executorId)
-      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
+      val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
+        securityManager)
       ms.start()
       ms
     }
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 03e3abb..edfd2ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.{config, Logging}
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.crypto.AuthServerBootstrap
 import org.apache.spark.network.netty.SparkTransportConf
@@ -43,7 +43,8 @@ private[deploy]
 class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
   extends Logging {
   protected val masterMetricsSystem =
-    MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)
+    MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE,
+      sparkConf, securityManager)
 
   private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
   private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a5aecd2..356e52c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -37,7 +37,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.rpc._
 import org.apache.spark.serializer.{JavaSerializer, Serializer}
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -86,9 +86,10 @@ private[deploy] class Master(
 
   Utils.checkHost(address.host)
 
-  private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
-  private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
-    securityMgr)
+  private val masterMetricsSystem =
+    MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf, securityMgr)
+  private val applicationMetricsSystem =
+    MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf, securityMgr)
   private val masterSource = new MasterSource(this)
 
   // After onStart, webUi will be set
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 49cda0f..07a9545 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -40,7 +40,7 @@ import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.rpc._
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
 
@@ -159,7 +159,8 @@ private[deploy] class Worker(
 
   private var connectionAttemptCount = 0
 
-  private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
+  private val metricsSystem =
+    MetricsSystem.createMetricsSystem(MetricsSystemInstances.WORKER, conf, securityMgr)
   private val workerSource = new WorkerSource(this)
 
   val reverseProxy = conf.get(UI_REVERSE_PROXY)
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index b1e311a..89dd74e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -235,3 +235,29 @@ private[spark] object MetricsSystem {
     new MetricsSystem(instance, conf, securityMgr)
   }
 }
+
+private[spark] object MetricsSystemInstances {
+  // The Spark standalone master process
+  val MASTER = "master"
+
+  // A component within the master which reports on various applications
+  val APPLICATIONS = "applications"
+
+  // A Spark standalone worker process
+  val WORKER = "worker"
+
+  // A Spark executor
+  val EXECUTOR = "executor"
+
+  // The Spark driver process (the process in which your SparkContext is created)
+  val DRIVER = "driver"
+
+  // The Spark shuffle service
+  val SHUFFLE_SERVICE = "shuffleService"
+
+  // The Spark ApplicationMaster when running on YARN
+  val APPLICATION_MASTER = "applicationMaster"
+
+  // The Spark cluster scheduler when running on Mesos
+  val MESOS_CLUSTER = "mesos_cluster"
+}
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index c512f29..99c9dde 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -76,7 +76,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     conf.set("spark.app.id", appId)
     conf.set("spark.executor.id", executorId)
 
-    val instanceName = "driver"
+    val instanceName = MetricsSystemInstances.DRIVER
     val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = driverMetricsSystem.buildRegistryName(source)
@@ -92,7 +92,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     val executorId = "driver"
     conf.set("spark.executor.id", executorId)
 
-    val instanceName = "driver"
+    val instanceName = MetricsSystemInstances.DRIVER
     val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = driverMetricsSystem.buildRegistryName(source)
@@ -108,7 +108,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     val appId = "testId"
     conf.set("spark.app.id", appId)
 
-    val instanceName = "driver"
+    val instanceName = MetricsSystemInstances.DRIVER
     val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = driverMetricsSystem.buildRegistryName(source)
@@ -126,7 +126,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     conf.set("spark.app.id", appId)
     conf.set("spark.executor.id", executorId)
 
-    val instanceName = "executor"
+    val instanceName = MetricsSystemInstances.EXECUTOR
     val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -142,7 +142,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     val executorId = "1"
     conf.set("spark.executor.id", executorId)
 
-    val instanceName = "executor"
+    val instanceName = MetricsSystemInstances.EXECUTOR
     val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -158,7 +158,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     val appId = "testId"
     conf.set("spark.app.id", appId)
 
-    val instanceName = "executor"
+    val instanceName = MetricsSystemInstances.EXECUTOR
     val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -200,7 +200,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     conf.set("spark.executor.id", executorId)
     conf.set(METRICS_NAMESPACE, "${spark.app.name}")
 
-    val instanceName = "executor"
+    val instanceName = MetricsSystemInstances.EXECUTOR
     val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -218,7 +218,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     conf.set("spark.executor.id", executorId)
     conf.set(METRICS_NAMESPACE, namespaceToResolve)
 
-    val instanceName = "executor"
+    val instanceName = MetricsSystemInstances.EXECUTOR
     val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = executorMetricsSystem.buildRegistryName(source)
@@ -238,7 +238,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
     conf.set("spark.app.name", appId)
     conf.set(METRICS_NAMESPACE, "${spark.app.name}")
 
-    val instanceName = "executor"
+    val instanceName = MetricsSystemInstances.EXECUTOR
     val executorMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
 
     val metricName = executorMetricsSystem.buildRegistryName(source)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 9df9169..cd5bd53 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
 import org.apache.spark.internal.config._
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.util.Utils
 
 /**
@@ -124,7 +124,8 @@ private[spark] class MesosClusterScheduler(
   extends Scheduler with MesosSchedulerUtils {
   var frameworkUrl: String = _
   private val metricsSystem =
-    MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
+    MetricsSystem.createMetricsSystem(MetricsSystemInstances.MESOS_CLUSTER, conf,
+      new SecurityManager(conf))
   private val master = conf.get("spark.master")
   private val appName = conf.get("spark.app.name")
   private val queuedCapacity = conf.get(config.MAX_DRIVERS)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1482b520..91e1fbd 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -46,7 +46,7 @@ import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -471,7 +471,8 @@ private[spark] class ApplicationMaster(
     rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
 
     allocator.allocateResources()
-    val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
+    val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,
+      sparkConf, securityMgr)
     val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
     ms.registerSource(new ApplicationMasterSource(prefix, allocator))
     // do not register static sources in this case as per SPARK-25277


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org