You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ryucc (via GitHub)" <gi...@apache.org> on 2023/04/27 00:32:57 UTC

[GitHub] [beam] ryucc commented on a diff in pull request #26437: Basic Tranform metric like throughput, watermark progress & latency support for non data shuffle transforms for Samza Runner

ryucc commented on code in PR #26437:
URL: https://github.com/apache/beam/pull/26437#discussion_r1178507207


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+    if (avgArrivalTimeMapForTransform != null && !inputs.isEmpty() && !outputs.isEmpty()) {

Review Comment:
   Can de-nest with
   
   ```
   if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() || outputs.isEmpty()) {
       return;
   }
   ```



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -159,12 +182,29 @@ public MessageStream<OpMessage<String>> getDummyStream() {
   }
 
   public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue) {
+    return getMessageStream(pvalue, true);
+  }
+
+  public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(

Review Comment:
   There are some article about "adding a boolean parameter to change behavior".
   
   Under this assumption, `getMessageStream(PValue pvalue)` and `getMessageStreamWithTransformMetric(PValue pvalue)` would be the better naming and signature.
   
   
   
   
   https://medium.com/@amlcurran/clean-code-the-curse-of-a-boolean-parameter-c237a830b7a3
   https://alexkondov.com/should-you-pass-boolean-to-functions/



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+    if (avgArrivalTimeMapForTransform != null && !inputs.isEmpty() && !outputs.isEmpty()) {
+      List<Long> inputPValuesArrivalTimes =
+          inputs.stream()
+              .map(avgArrivalTimeMapForTransform::get)
+              .map(map -> map == null ? null : map.remove(watermark))
+              .filter(time -> time != null)
+              .collect(Collectors.toList());
+
+      List<Long> outputPValuesArrivalTimes =
+          outputs.stream()
+              .map(avgArrivalTimeMapForTransform::get)
+              .map(map -> map == null ? null : map.remove(watermark))

Review Comment:
   Can split to 2 lines for readability.
   
   ```
   .filter(map -> map != null)
   .map(map -> map.remove(watermark))
   ```



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+    if (avgArrivalTimeMapForTransform != null && !inputs.isEmpty() && !outputs.isEmpty()) {
+      List<Long> inputPValuesArrivalTimes =
+          inputs.stream()
+              .map(avgArrivalTimeMapForTransform::get)
+              .map(map -> map == null ? null : map.remove(watermark))
+              .filter(time -> time != null)
+              .collect(Collectors.toList());
+
+      List<Long> outputPValuesArrivalTimes =
+          outputs.stream()
+              .map(avgArrivalTimeMapForTransform::get)
+              .map(map -> map == null ? null : map.remove(watermark))
+              .filter(time -> time != null)

Review Comment:
   arrivalTime might be more descriptive.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricOp.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+
+/**
+ * MetricOp for default throughput, latency & watermark progress metric per transform for Beam Samza
+ * Runner. A MetricOp can be either attached to Input PCollection or Output PCollection of a
+ * PTransform.
+ *
+ * <p>A MetricOp is created per primitive PTransform per PCollection its across its inputs &
+ * outputs. 1. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform. 2. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform.
+ *
+ * <p>Each concrete MetricOp is responsible for following metrics computation: 1. Throughput: Emit
+ * the number of elements processed in the PCollection 2. Watermark Progress: Emit the watermark
+ * progress of the PCollection 3. Latency: Maintain the avg arrival time per watermark across
+ * elements it processes, compute & emit the latency
+ *
+ * @param <T> type of the message
+ */
+public abstract class SamzaMetricOp<T> implements Op<T, T, Void> {
+  // Unique name of the PTransform this MetricOp is associated with
+  protected final String transformFullName;
+  protected final BeamTransformMetricRegistry beamTransformMetricRegistry;
+  // Name or identifier of the PCollection which Ptrasform is processing
+  protected final String pValue;
+  // List of input PValue(s) for all PCollections processing the PTransform
+  protected List<String> transformInputs;
+  // List of output PValue(s) for all PCollections processing the PTransform
+  protected List<String> transformOutputs;
+  // Name of the task, for logging purpose
+  protected String task;
+
+  // Some fields are initialized in open() method, which is called after the constructor.
+  @SuppressWarnings("initialization.fields.uninitialized")
+  public SamzaMetricOp(
+      String pValue,
+      String transformFullName,
+      BeamTransformMetricRegistry beamTransformMetricRegistry) {
+    this.transformFullName = transformFullName;
+    this.beamTransformMetricRegistry = beamTransformMetricRegistry;
+    this.pValue = pValue;
+  }
+
+  @Override
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void open(
+      Config config,
+      Context context,
+      Scheduler<KeyedTimerData<Void>> timerRegistry,
+      OpEmitter<T> emitter) {
+    final Map.Entry<String, String> transformInputOutput =
+        PipelineJsonRenderer.getTransformIOMap(config).get(transformFullName);
+    this.transformInputs =
+        transformInputOutput != null
+            ? ioFunc(transformInputOutput.getKey()).get()
+            : new ArrayList();
+    this.transformOutputs =
+        transformInputOutput != null
+            ? ioFunc(transformInputOutput.getValue()).get()
+            : new ArrayList();
+    // for logging / debugging purposes
+    this.task = context.getTaskContext().getTaskModel().getTaskName().getTaskName();
+    // Register the transform with BeamTransformMetricRegistry
+    beamTransformMetricRegistry.register(transformFullName, pValue, context);
+  }
+
+  private static Supplier<List<String>> ioFunc(String ioList) {

Review Comment:
   Why not just use a normal `private static List<String> ioFunc(String ioList)`, instead of the Supplier?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+    if (avgArrivalTimeMapForTransform != null && !inputs.isEmpty() && !outputs.isEmpty()) {
+      List<Long> inputPValuesArrivalTimes =
+          inputs.stream()
+              .map(avgArrivalTimeMapForTransform::get)
+              .map(map -> map == null ? null : map.remove(watermark))
+              .filter(time -> time != null)
+              .collect(Collectors.toList());
+
+      List<Long> outputPValuesArrivalTimes =
+          outputs.stream()
+              .map(avgArrivalTimeMapForTransform::get)
+              .map(map -> map == null ? null : map.remove(watermark))
+              .filter(time -> time != null)
+              .collect(Collectors.toList());
+
+      if (!inputPValuesArrivalTimes.isEmpty() && !outputPValuesArrivalTimes.isEmpty()) {

Review Comment:
   Can also de-nest by putting the fail case first.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -283,4 +323,13 @@ public StoreIdGenerator getStoreIdGenerator() {
     sendFn.accept(new EndOfStreamMessage(null));
     return dummyInput;
   }
+
+  boolean doAttachMetricOp(Config config, boolean enableTransformMetric) {

Review Comment:
   Is `shouldDoAttachMetricOp` a better naming?
   
   I feel `doAttachMetricOp` has a naming collision with a method that does the actual action.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant, OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}. Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaInputMetricOp.class);
+  // Counters to maintain avg arrival time per watermark for input PCollection.
+  private AtomicLong count;
+  private AtomicReference<BigInteger> sumOfTimestamps;
+
+  public SamzaInputMetricOp(
+      String pValue,
+      String transformFullName,
+      BeamTransformMetricRegistry beamTransformMetricRegistry) {
+    super(pValue, transformFullName, beamTransformMetricRegistry);
+    this.count = new AtomicLong(0L);
+    this.sumOfTimestamps = new AtomicReference<>(BigInteger.ZERO);
+  }
+
+  @Override
+  public void processElement(WindowedValue<T> inputElement, OpEmitter<T> emitter) {
+    count.incrementAndGet();
+    sumOfTimestamps.updateAndGet(sum -> sum.add(BigInteger.valueOf(System.nanoTime())));
+    beamTransformMetricRegistry
+        .getTransformMetrics()
+        .getTransformInputThroughput(transformFullName)
+        .inc();
+    emitter.emitElement(inputElement);
+  }
+
+  @Override
+  public void processWatermark(Instant watermark, OpEmitter<T> emitter) {
+    if (LOG.isDebugEnabled()) {

Review Comment:
   We only need this line when the errMsg is expensive to construct no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org