You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/27 06:12:32 UTC

[GitHub] [samza] abhishekshivanna opened a new pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

abhishekshivanna opened a new pull request #1368:
URL: https://github.com/apache/samza/pull/1368


   Changes:
   The refactoring is done to make the reporter stable when
   exceptions are encountered. The patch also breaks down the
   run() method in the MetricsSnapshotReporter into smaller
   overridable pieces to make it more extendable/re-usable.
   
   Tests: None
   API Changes: None
   Upgrade Instructions: None
   Usage Instructions: None


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xiefan46 commented on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
xiefan46 commented on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-635558405


   > > Can we re-write those scala codes into java so that it can be much easier for us to debug and implement new features?
   > 
   > I agree that we should ideally have one language for the entire project, but changing this from scala to java is out of scope for this refactor. We can maybe revisit it as part of a broader initiative to move scala bits to java.
   
   Sure. This is not related to this patch. I am just curious about that. Everything else in this patch lgtm. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432634477



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
-
-    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {
     val systemConfig = new SystemConfig(config)
+    val systemName = getSystemStream(reporterName, config).getSystem
     val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
       .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
-
     val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     info("Got system factory %s." format systemFactory)
+    val producer = systemFactory.getProducer(systemName, config, registry)
+    info("Got producer %s." format producer)
 
-    val registry = new MetricsRegistryMap
+    producer
+  }
 
-    val producer = systemFactory.getProducer(systemName, config, registry)
+  def getSystemStream(reporterName: String, config: Config): SystemStream = {

Review comment:
       Agreed! Except for the shared producer (discussion on this thread above) changed all these methods to be `protected` so custom factories that extend from this class, don't have to duplicate code. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r431519645



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -77,15 +71,48 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
     } else {
       new MetricsSnapshotSerdeV2
     }
-
     info("Got serde %s." format serde)
+    serde
+  }
+
 
-    val pollingInterval: Int = metricsConfig.getMetricsSnapshotReporterInterval(name)
+  def getBlacklist(reporterName: String, config: Config): Option[String] = {
+    val metricsConfig = new MetricsConfig(config)
+    val blacklist = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
+    info("Got blacklist as: %s" format blacklist)
+    blacklist
+  }
 
-    info("Setting polling interval to %d" format pollingInterval)
+  def getPollingInterval(reporterName: String, config: Config): Int = {
+    val metricsConfig = new MetricsConfig(config)
+    val pollingInterval = metricsConfig.getMetricsSnapshotReporterInterval(reporterName)

Review comment:
       Good point! I'm not sure why this was called `pollingInterval` to begin with.
   Fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r434733689



##########
File path: samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
##########
@@ -19,23 +19,51 @@
 
 package org.apache.samza.metrics;
 
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.serializers.Serializer;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.inmemory.InMemorySystemProducer;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import scala.Some;
 import scala.runtime.AbstractFunction0;
 
+import static org.mockito.Mockito.*;

Review comment:
       can we use explicit exports instead of wildcards?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432583586



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
-
-    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {
     val systemConfig = new SystemConfig(config)
+    val systemName = getSystemStream(reporterName, config).getSystem
     val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
       .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
-
     val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     info("Got system factory %s." format systemFactory)
+    val producer = systemFactory.getProducer(systemName, config, registry)
+    info("Got producer %s." format producer)
 
-    val registry = new MetricsRegistryMap
+    producer
+  }
 
-    val producer = systemFactory.getProducer(systemName, config, registry)
+  def getSystemStream(reporterName: String, config: Config): SystemStream = {

Review comment:
       private ?
   same for all the other helper methods below that are invoked only from within this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-636087332


   > Can you explain why you need to share producers here?
   
   > Ideally we should inject the shared objects during the construction and manage the lifecycle of the shared object outside the scope of its dependencies.
   
   I completely agree! Unfortunately, the implementation of `DiagnosticsUtil` [1] which instantiates the `MetricsSnapshotReporter` and `DiagnosticsManager` use a shared producer - where the lifecycle of the producer is managed by only `MetricsSnapshotReporter`.
   
   What do you think about fixing `DiagnosticsManager` and `MetricsSnapshotReporter` to each have their own independent producers?
   
   [1] https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java#L136


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1368:
URL: https://github.com/apache/samza/pull/1368


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r431359170



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
##########
@@ -160,12 +171,9 @@ class MetricsSnapshotReporter(
         }
       }
     }
-
-
     debug("Finished flushing metrics.")
   }
 
-

Review comment:
       Makes sense. Fixed the log level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432583479



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
-
-    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {

Review comment:
       private ?

##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
-
-    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {
     val systemConfig = new SystemConfig(config)
+    val systemName = getSystemStream(reporterName, config).getSystem
     val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
       .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
-
     val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     info("Got system factory %s." format systemFactory)
+    val producer = systemFactory.getProducer(systemName, config, registry)
+    info("Got producer %s." format producer)
 
-    val registry = new MetricsRegistryMap
+    producer
+  }
 
-    val producer = systemFactory.getProducer(systemName, config, registry)
+  def getSystemStream(reporterName: String, config: Config): SystemStream = {

Review comment:
       private ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna edited a comment on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna edited a comment on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-635468266


   > Can we re-write those scala codes into java so that it can be much easier for us to debug and implement new features?
   
   I agree that we should ideally have one language for the entire project, but changing this from scala to java is out of scope for this refactor. We can maybe revisit it as part of a broader initiative to move scala bits to java. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432712260



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
##########
@@ -106,9 +106,20 @@ class MetricsSnapshotReporter(
     }
   }
 
-  def run {
-    debug("Begin flushing metrics.")
+  def run() {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-636024955


   Changes w.r.t exception and refactoring run looks good to me. I have few questions on why expose the `getProducer` within the `MetricsSnapshotReporter`.
   
   > so we can share the existing producer with
   when creating wrappers around the MetricsSnapshotReporter's stream (eg: DiagnosticsManager)
   
   1. Can you explain why you need to share producers here?
   2. By wrappers, do you mean class extensions of MetricsSnapshotReporter? Ideally we should inject the shared objects during the construction and manage the lifecycle of the shared object outside the scope of its dependencies. With the current setup, `MetricsSnapshotReporter` manages the lifecycle of the producer & may choose to handle lifecycle of the `producer` that doesn't necessarily align with the expectation of `DiagnosticsManager` and cause problems. e.g. `MetricSnapshotReporter` may choose to close the producer or recreate producer because of a failure and `DiagnosticsManager` can perform operations on the handle of producer that is no longer active or that is stale.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432587309



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
##########
@@ -106,9 +106,20 @@ class MetricsSnapshotReporter(
     }
   }
 
-  def run {
-    debug("Begin flushing metrics.")
+  def run() {

Review comment:
       Do you think it'd be possible to add a test to test the emission logic in TestMetricsSnapshotReporter?
   Somehow it was overlooked before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-635468266


   > Can we re-write those scala codes into java so that it can be much easier for us to debug and implement new features?
   I agree that we should ideally have one language for the entire project, but changing this from scala to java is out of scope for this refactor. We can maybe revisit it as part of a broader initiative to move scala bits to java. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r431308428



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
##########
@@ -160,12 +171,9 @@ class MetricsSnapshotReporter(
         }
       }
     }
-
-
     debug("Finished flushing metrics.")
   }
 
-

Review comment:
       Unrelated to PR but useful: Can you change the info log below "Blacklisted metric %s because it matched ..." to either only be logged once with all metrics or logged at debug?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432631182



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -77,29 +71,62 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
     } else {
       new MetricsSnapshotSerdeV2
     }
-
     info("Got serde %s." format serde)
+    serde
+  }
+
 
-    val pollingInterval: Int = metricsConfig.getMetricsSnapshotReporterInterval(name)
+  def getBlacklist(reporterName: String, config: Config): Option[String] = {
+    val metricsConfig = new MetricsConfig(config)
+    val blacklist = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
+    info("Got blacklist as: %s" format blacklist)
+    blacklist
+  }
 
-    info("Setting polling interval to %d" format pollingInterval)
+  def getReportingInterval(reporterName: String, config: Config): Int = {
+    val metricsConfig = new MetricsConfig(config)
+    val reportingInterval = metricsConfig.getMetricsSnapshotReporterInterval(reporterName)
+    info("Got reporting interval: %d" format reportingInterval)
+    reportingInterval
+  }
+
+  def getJobId(config: Config): String = {
+    val jobConfig = new JobConfig(config)
+    jobConfig.getJobId
+  }
+
+  def getJobName(config: Config): String = {
+    val jobConfig = new JobConfig(config)
+    JavaOptionals.toRichOptional(jobConfig.getName).toOption
+      .getOrElse(throw new SamzaException("Job name must be defined in config."))
+  }

Review comment:
       I feel it helps read the code better. Since this is not a frequently executed code path it should be fine to trade-off readability for a stack that is one level deeper.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432584385



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -77,29 +71,62 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
     } else {
       new MetricsSnapshotSerdeV2
     }
-
     info("Got serde %s." format serde)
+    serde
+  }
+
 
-    val pollingInterval: Int = metricsConfig.getMetricsSnapshotReporterInterval(name)
+  def getBlacklist(reporterName: String, config: Config): Option[String] = {
+    val metricsConfig = new MetricsConfig(config)
+    val blacklist = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
+    info("Got blacklist as: %s" format blacklist)
+    blacklist
+  }
 
-    info("Setting polling interval to %d" format pollingInterval)
+  def getReportingInterval(reporterName: String, config: Config): Int = {
+    val metricsConfig = new MetricsConfig(config)
+    val reportingInterval = metricsConfig.getMetricsSnapshotReporterInterval(reporterName)
+    info("Got reporting interval: %d" format reportingInterval)
+    reportingInterval
+  }
+
+  def getJobId(config: Config): String = {
+    val jobConfig = new JobConfig(config)
+    jobConfig.getJobId
+  }
+
+  def getJobName(config: Config): String = {
+    val jobConfig = new JobConfig(config)
+    JavaOptionals.toRichOptional(jobConfig.getName).toOption
+      .getOrElse(throw new SamzaException("Job name must be defined in config."))
+  }

Review comment:
       nit: For the things are just methods in jobConfig perhaps separate helper-methods are an overkill?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r431440392



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -77,15 +71,48 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
     } else {
       new MetricsSnapshotSerdeV2
     }
-
     info("Got serde %s." format serde)
+    serde
+  }
+
 
-    val pollingInterval: Int = metricsConfig.getMetricsSnapshotReporterInterval(name)
+  def getBlacklist(reporterName: String, config: Config): Option[String] = {
+    val metricsConfig = new MetricsConfig(config)
+    val blacklist = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
+    info("Got blacklist as: %s" format blacklist)
+    blacklist
+  }
 
-    info("Setting polling interval to %d" format pollingInterval)
+  def getPollingInterval(reporterName: String, config: Config): Int = {
+    val metricsConfig = new MetricsConfig(config)
+    val pollingInterval = metricsConfig.getMetricsSnapshotReporterInterval(reporterName)

Review comment:
       why is this `pollingInterval` instead of `reportingInterval`? makes me think we poll for metrics at this interval but then the error statement above which says "Error reporting ... `pollingInterval`" is confusing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-634898621


   Edited the description to include the latest patch to MetricsSnapshotReporter:
   
   API Changes: a new `getProducer` method added to MetricsSnapshotReporter
   
   Including this new method so we can share the existing producer with
   when creating wrappers around the MetricsSnapshotReporter's stream (eg: DiagnosticsManager).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna edited a comment on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna edited a comment on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-636087332


   @bharathkk 
   > Can you explain why you need to share producers here?
   
   > Ideally we should inject the shared objects during the construction and manage the lifecycle of the shared object outside the scope of its dependencies.
   
   I completely agree! Unfortunately, the implementation of `DiagnosticsUtil` [1] which instantiates the `MetricsSnapshotReporter` and `DiagnosticsManager` use a shared producer - where the lifecycle of the producer is managed by only `MetricsSnapshotReporter`.
   
   What do you think about fixing `DiagnosticsManager` and `MetricsSnapshotReporter` to each have their own independent producers?
   
   [1] https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java#L136


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] xiefan46 commented on pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
xiefan46 commented on pull request #1368:
URL: https://github.com/apache/samza/pull/1368#issuecomment-635043466


   Can we re-write those scala codes into java so that it can be much easier for us to debug and implement new features?  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] abhishekshivanna commented on a change in pull request #1368: SAMZA-2532: Refactor MetricsSnapshotReporter and MetricsSnapshotReporterFactory

Posted by GitBox <gi...@apache.org>.
abhishekshivanna commented on a change in pull request #1368:
URL: https://github.com/apache/samza/pull/1368#discussion_r432628091



##########
File path: samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
##########
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in config."))
-
-    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {

Review comment:
       This kinda relates to @bharathkk comment - where we need to share this producer externally in `DiagnosticsUtil`. See changes in https://github.com/apache/samza/pull/1369/files 
   Do you think if we had two separate producers for `DiagnosticsManager` and `MetricsSnapshotReporter` would pose a problem?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org