You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/14 11:34:44 UTC

[GitHub] [beam] echauchot commented on a change in pull request #13435: [BEAM-11358] Remove unused endOfStreamMonitor from nexmark and remove redundant distributions from monitor

echauchot commented on a change in pull request #13435:
URL: https://github.com/apache/beam/pull/13435#discussion_r535078859



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
##########
@@ -78,7 +78,7 @@
   private static AtomicInteger namespaceCounter = new AtomicInteger(0);
 
   private static final ExpansionServiceClientFactory DEFAULT =
-     DefaultExpansionServiceClientFactory.create(
+      DefaultExpansionServiceClientFactory.create(

Review comment:
       please revert this change as it is unrelated

##########
File path: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
##########
@@ -39,7 +39,6 @@
   final NexmarkConfiguration configuration;
   public final Monitor<Event> eventMonitor;
   public final Monitor<T> resultMonitor;
-  private final Monitor<Event> endOfStreamMonitor;

Review comment:
       +1 on cleaning

##########
File path: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
##########
@@ -39,20 +39,15 @@
   private class MonitorDoFn extends DoFn<T, T> {
     final Counter elementCounter = Metrics.counter(name, prefix + ".elements");
     final Counter bytesCounter = Metrics.counter(name, prefix + ".bytes");
-    final Distribution startTime = Metrics.distribution(name, prefix + ".startTime");
-    final Distribution endTime = Metrics.distribution(name, prefix + ".endTime");
-    final Distribution startTimestamp = Metrics.distribution(name, prefix + ".startTimestamp");
-    final Distribution endTimestamp = Metrics.distribution(name, prefix + ".endTimestamp");
+    final Distribution systemTimestamp = Metrics.distribution(name, prefix + ".systemTimestamp");

Review comment:
       it is not a timeStamp, it is in processing time (the wall clock) as opposed to a timestamp which is a component of an element put by the source. Please rename to "processingTime"

##########
File path: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
##########
@@ -336,7 +336,7 @@ private NexmarkPerf currentPerf(
       perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
     }
 
-    if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
+    if (eventStart >= 0 && resultStart >= eventStart) {

Review comment:
       same here

##########
File path: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
##########
@@ -302,7 +302,7 @@ private NexmarkPerf currentPerf(
       effectiveEnd = eventEnd;
     }
 
-    if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
+    if (eventStart >= 0 && effectiveEnd >= eventStart) {

Review comment:
       true that this expression is equivalent and simpler but less readable. I would prefer a revert

##########
File path: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
##########
@@ -39,20 +39,15 @@
   private class MonitorDoFn extends DoFn<T, T> {
     final Counter elementCounter = Metrics.counter(name, prefix + ".elements");
     final Counter bytesCounter = Metrics.counter(name, prefix + ".bytes");
-    final Distribution startTime = Metrics.distribution(name, prefix + ".startTime");
-    final Distribution endTime = Metrics.distribution(name, prefix + ".endTime");
-    final Distribution startTimestamp = Metrics.distribution(name, prefix + ".startTimestamp");
-    final Distribution endTimestamp = Metrics.distribution(name, prefix + ".endTimestamp");
+    final Distribution systemTimestamp = Metrics.distribution(name, prefix + ".systemTimestamp");
+    final Distribution eventTimestamp = Metrics.distribution(name, prefix + ".eventTimestamp");
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       elementCounter.inc();
       bytesCounter.inc(c.element().sizeInBytes());
-      long now = System.currentTimeMillis();
-      startTime.update(now);
-      endTime.update(now);
-      startTimestamp.update(c.timestamp().getMillis());
-      endTimestamp.update(c.timestamp().getMillis());
+      systemTimestamp.update(System.currentTimeMillis());

Review comment:
       agree with your simplification for these reasons:
   - for each element processed, the distribution metrics start and end will be updated with the same value and, as the distribution reports min, max and average, and, as in _MetricsReader_ we are requesting the minimum and the maximum, there is no impact of having a single distribution
   - the methods in _MetricsReader_ use a SplitIterator on the distributions that is not immutable for the underlying source (which could explain that there was 2 distributions created) but it does only reading (by calling _getMin()_ and _getMax()_) so there will be no side effect on the underlying distribution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org