You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/29 16:00:37 UTC

[GitHub] [samza] rmatharu commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

rmatharu commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432583479



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
-
-    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {

Review comment:
       private ?

##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
-
-    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {
     val systemConfig = new SystemConfig(config)
+    val systemName = getSystemStream(reporterName, config).getSystem
     val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
       .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
-
     val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     info("Got system factory %s." format systemFactory)
+    val producer = systemFactory.getProducer(systemName, config, registry)
+    info("Got producer %s." format producer)
 
-    val registry = new MetricsRegistryMap
+    producer
+  }
 
-    val producer = systemFactory.getProducer(systemName, config, registry)
+  def getSystemStream(reporterName: String, config: Config): SystemStream = {

Review comment:
       private ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org