You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "xinyuiscool (via GitHub)" <gi...@apache.org> on 2023/04/28 23:51:23 UTC

[GitHub] [beam] xinyuiscool commented on a diff in pull request #26437: #26456 : Basic Tranform metric like throughput, watermark progress & latency support for non data shuffle transforms for Samza Runner

xinyuiscool commented on code in PR #26437:
URL: https://github.com/apache/beam/pull/26437#discussion_r1180867475


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/TestSamzaRunner.java:
##########
@@ -30,22 +31,33 @@
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.commons.io.FileUtils;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Test {@link SamzaRunner}. */
 public class TestSamzaRunner extends PipelineRunner<PipelineResult> {
 
   private final SamzaRunner delegate;
+  private static InMemoryMetricsReporter testMetricsReporter = new InMemoryMetricsReporter();

Review Comment:
   I think we should add this reporter in the SamzaPipelineOptions when running the corresponding tests, instead of hard-coded in TestSamzaRunner. I don't think we need to change the test runner code.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>

Review Comment:
   Why we need transform name here? If you just keep the {PValue_id -> {watermark -> time}}, seems it should still work. 
   
   mark it final.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant, OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}. Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaInputMetricOp.class);
+  // Counters to maintain avg arrival time per watermark for input PCollection.
+  private AtomicLong count;

Review Comment:
   final



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -145,9 +152,25 @@ protected <KeyT, OutT> void registerInputMessageStreams(
   }
 
   public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
+    registerMessageStream(pvalue, stream, true);
+  }
+
+  public <OutT> void registerMessageStream(
+      PValue pvalue, MessageStream<OpMessage<OutT>> stream, boolean enableTransformMetric) {
     if (messsageStreams.containsKey(pvalue)) {
       throw new IllegalArgumentException("Stream already registered for pvalue: " + pvalue);
     }
+    // add a step to attach OutputMetricOp if registered for Op Stream
+    final Config overrideConfig = new MapConfig(getPipelineOptions().getConfigOverride());

Review Comment:
   Don't use config override. Use PipelineOptions. We have an option for enable/disable metrics, pls add one more for TransformMetrics.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PipelineJsonRenderer.java:
##########
@@ -292,4 +302,29 @@ static Map<String, Map.Entry<String, String>> buildTransformIOMap(
   private static Supplier<List<String>> getIOPValueList(Map<TupleTag<?>, PCollection<?>> map) {
     return () -> map.values().stream().map(pColl -> pColl.getName()).collect(Collectors.toList());
   }
+
+  // Reads the config to build transformIOMap, i.e. map of inputs & output PValues for each
+  // PTransform
+  public static Map<String, Map.Entry<String, String>> getTransformIOMap(Config config) {
+    checkNotNull(config, "Config cannot be null");
+    final Map<String, Map.Entry<String, String>> result = new HashMap<>();
+    final String pipelineJsonGraph = config.get(SamzaRunner.BEAM_JSON_GRAPH);
+    if (pipelineJsonGraph == null) {
+      LOG.warn(
+          "Cannot build transformIOMap since Config: {} is found null ",
+          SamzaRunner.BEAM_JSON_GRAPH);
+      return result;
+    }
+    JsonObject jsonObject = JsonParser.parseString(pipelineJsonGraph).getAsJsonObject();
+    JsonArray transformIOInfo = jsonObject.getAsJsonArray("transformIOInfo");
+    transformIOInfo.forEach(
+        transform -> {
+          final String transformName =
+              transform.getAsJsonObject().get("transformName").getAsString();
+          final String inputs = transform.getAsJsonObject().get("inputs").getAsString();

Review Comment:
   Can we directly get the inputs/outputs as list here? Serde json as list, instead of manipulating the delimeter.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+
+    if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() || outputs.isEmpty()) {
+      return;
+    }
+
+    // get the avg arrival times for all the input PValues
+    List<Long> inputPValuesAvgArrivalTimes =
+        inputs.stream()
+            .map(avgArrivalTimeMapForTransform::get)
+            .map(map -> map == null ? null : map.remove(watermark))
+            .filter(avgArrivalTime -> avgArrivalTime != null)
+            .collect(Collectors.toList());
+
+    // get the avg arrival times for all the output PValues
+    List<Long> outputPValuesAvgArrivalTimes =

Review Comment:
   final



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =

Review Comment:
   final



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+
+    if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() || outputs.isEmpty()) {
+      return;
+    }
+
+    // get the avg arrival times for all the input PValues
+    List<Long> inputPValuesAvgArrivalTimes =
+        inputs.stream()
+            .map(avgArrivalTimeMapForTransform::get)
+            .map(map -> map == null ? null : map.remove(watermark))
+            .filter(avgArrivalTime -> avgArrivalTime != null)
+            .collect(Collectors.toList());
+
+    // get the avg arrival times for all the output PValues
+    List<Long> outputPValuesAvgArrivalTimes =
+        outputs.stream()
+            .map(avgArrivalTimeMapForTransform::get)
+            .map(map -> map == null ? null : map.remove(watermark))
+            .filter(avgArrivalTime -> avgArrivalTime != null)
+            .collect(Collectors.toList());
+
+    if (inputPValuesAvgArrivalTimes.isEmpty() || outputPValuesAvgArrivalTimes.isEmpty()) {
+      LOG.debug(
+          "Failure to Emit Metric for Transform: {} inputArrivalTime: {} or outputArrivalTime: {} not found for Watermark: {} Task: {}",
+          transformName,
+          inputPValuesAvgArrivalTimes,
+          inputPValuesAvgArrivalTimes,
+          watermark,
+          taskName);
+      return;
+    }
+
+    long startTime = Collections.min(inputPValuesAvgArrivalTimes);

Review Comment:
   final for all vars below.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamTransformMetricRegistry.class);
+
+  // TransformName -> PValue for pCollection -> Map<WatermarkId, AvgArrivalTime>
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>>>
+      avgArrivalTimeMap;
+
+  // Per Transform Metrics for each primitive transform
+  private final BeamTransformMetrics transformMetrics;
+
+  public BeamTransformMetricRegistry() {
+    this.avgArrivalTimeMap = new ConcurrentHashMap<>();
+    this.transformMetrics = new BeamTransformMetrics();
+  }
+
+  public void register(String transformFullName, String pValue, Context ctx) {
+    transformMetrics.register(transformFullName, ctx);
+    // initialize the map for the transform
+    avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap<>());
+    avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap<>());
+  }
+
+  public BeamTransformMetrics getTransformMetrics() {
+    return transformMetrics;
+  }
+
+  public void updateArrivalTimeMap(String transformName, String pValue, long watermark, long avg) {
+    if (avgArrivalTimeMap.get(transformName) != null
+        && avgArrivalTimeMap.get(transformName).get(pValue) != null) {
+      ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue =
+          avgArrivalTimeMap.get(transformName).get(pValue);
+      // update the average arrival time for the latest watermark
+      avgArrivalTimeMapForPValue.put(watermark, avg);
+      // remove any stale entries which are lesser than the watermark
+      // todo: check is this safe to do here input metric op may be ahead in watermark than output?
+      // why not do it at emission time?
+      avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> entry.getKey() < watermark);
+    }
+  }
+
+  // Checker framework bug: https://github.com/typetools/checker-framework/issues/979
+  @SuppressWarnings("return")
+  public void emitLatencyMetric(
+      String transformName,
+      List<String> inputs,
+      List<String> outputs,
+      Long watermark,
+      String taskName) {
+    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform =
+        avgArrivalTimeMap.get(transformName);
+
+    if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() || outputs.isEmpty()) {
+      return;
+    }
+
+    // get the avg arrival times for all the input PValues
+    List<Long> inputPValuesAvgArrivalTimes =

Review Comment:
   final



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/TestSamzaRunner.java:
##########
@@ -91,4 +105,27 @@ public PipelineResult run(Pipeline pipeline) {
       throw t;
     }
   }
+
+  public static class InMemoryMetricsReporter implements MetricsReporter {

Review Comment:
   MOve this out to be in util or something. Add java doc for every class.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricOp.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+
+/**
+ * MetricOp for default throughput, latency & watermark progress metric per transform for Beam Samza
+ * Runner. A MetricOp can be either attached to Input PCollection or Output PCollection of a
+ * PTransform.
+ *
+ * <p>A MetricOp is created per primitive PTransform per PCollection its across its inputs &
+ * outputs. 1. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform. 2. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform.
+ *
+ * <p>Each concrete MetricOp is responsible for following metrics computation: 1. Throughput: Emit
+ * the number of elements processed in the PCollection 2. Watermark Progress: Emit the watermark
+ * progress of the PCollection 3. Latency: Maintain the avg arrival time per watermark across
+ * elements it processes, compute & emit the latency
+ *
+ * @param <T> type of the message
+ */
+public abstract class SamzaMetricOp<T> implements Op<T, T, Void> {
+  // Unique name of the PTransform this MetricOp is associated with
+  protected final String transformFullName;
+  protected final BeamTransformMetricRegistry beamTransformMetricRegistry;
+  // Name or identifier of the PCollection which Ptrasform is processing
+  protected final String pValue;
+  // List of input PValue(s) for all PCollections processing the PTransform
+  protected List<String> transformInputs;

Review Comment:
   transient



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetrics.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SlidingTimeWindowReservoir;
+import org.apache.samza.metrics.Timer;
+
+/**
+ * Metrics like throughput, latency and watermark progress for each Beam transform for Samza Runner.
+ */
+@SuppressWarnings("return")
+public class BeamTransformMetrics implements Serializable {

Review Comment:
   s/Beam/Samza
   
   same question: seems this class has a few concurrenthashmap, are those seriazable?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant, OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}. Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaInputMetricOp.class);
+  // Counters to maintain avg arrival time per watermark for input PCollection.
+  private AtomicLong count;
+  private AtomicReference<BigInteger> sumOfTimestamps;

Review Comment:
   final



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant, OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}. Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {

Review Comment:
   MOve this class and other related metrics ops to metrics package.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricOp.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+
+/**
+ * MetricOp for default throughput, latency & watermark progress metric per transform for Beam Samza
+ * Runner. A MetricOp can be either attached to Input PCollection or Output PCollection of a
+ * PTransform.
+ *
+ * <p>A MetricOp is created per primitive PTransform per PCollection its across its inputs &
+ * outputs. 1. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform. 2. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform.
+ *
+ * <p>Each concrete MetricOp is responsible for following metrics computation: 1. Throughput: Emit
+ * the number of elements processed in the PCollection 2. Watermark Progress: Emit the watermark
+ * progress of the PCollection 3. Latency: Maintain the avg arrival time per watermark across
+ * elements it processes, compute & emit the latency
+ *
+ * @param <T> type of the message
+ */
+public abstract class SamzaMetricOp<T> implements Op<T, T, Void> {
+  // Unique name of the PTransform this MetricOp is associated with
+  protected final String transformFullName;
+  protected final BeamTransformMetricRegistry beamTransformMetricRegistry;
+  // Name or identifier of the PCollection which Ptrasform is processing
+  protected final String pValue;
+  // List of input PValue(s) for all PCollections processing the PTransform
+  protected List<String> transformInputs;
+  // List of output PValue(s) for all PCollections processing the PTransform
+  protected List<String> transformOutputs;

Review Comment:
   transient



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {

Review Comment:
   s/Beam/Samza



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -145,9 +152,25 @@ protected <KeyT, OutT> void registerInputMessageStreams(
   }
 
   public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
+    registerMessageStream(pvalue, stream, true);
+  }
+
+  public <OutT> void registerMessageStream(
+      PValue pvalue, MessageStream<OpMessage<OutT>> stream, boolean enableTransformMetric) {

Review Comment:
   get rid of this boolean flag.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java:
##########
@@ -104,7 +104,7 @@ private static <K, InputT, OutputT> void doTranslate(
             outputTag,
             input.isBounded());
 
-    ctx.registerMessageStream(output, outputStream);
+    ctx.registerMessageStream(output, outputStream, false);

Review Comment:
   Why we need to pass false here? We need to compute the transform metrics for GBK too, right?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/BeamTransformMetricRegistry.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.metrics;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BeamTransformMetricRegistry is a registry that maintains the metrics for each transform. It
+ * maintains the average arrival time for each PCollection for a primitive transform.
+ *
+ * <p>For a non-data shuffling primitive transform, the average arrival time is calculated per
+ * watermark, per PCollection {@link org.apache.beam.sdk.values.PValue} and updated in
+ * avgArrivalTimeMap
+ */
+public class BeamTransformMetricRegistry implements Serializable {

Review Comment:
   Why this needs to be serializable? I don't know whether this ConcurrentHashMap is serializable



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricOp.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.runners.samza.util.PipelineJsonRenderer;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
+
+/**
+ * MetricOp for default throughput, latency & watermark progress metric per transform for Beam Samza
+ * Runner. A MetricOp can be either attached to Input PCollection or Output PCollection of a
+ * PTransform.
+ *
+ * <p>A MetricOp is created per primitive PTransform per PCollection its across its inputs &
+ * outputs. 1. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform. 2. An independent MetricOp is created and attached to each input PCollection to the
+ * PTransform.
+ *
+ * <p>Each concrete MetricOp is responsible for following metrics computation: 1. Throughput: Emit
+ * the number of elements processed in the PCollection 2. Watermark Progress: Emit the watermark
+ * progress of the PCollection 3. Latency: Maintain the avg arrival time per watermark across
+ * elements it processes, compute & emit the latency
+ *
+ * @param <T> type of the message
+ */
+public abstract class SamzaMetricOp<T> implements Op<T, T, Void> {
+  // Unique name of the PTransform this MetricOp is associated with
+  protected final String transformFullName;
+  protected final BeamTransformMetricRegistry beamTransformMetricRegistry;
+  // Name or identifier of the PCollection which Ptrasform is processing
+  protected final String pValue;
+  // List of input PValue(s) for all PCollections processing the PTransform
+  protected List<String> transformInputs;
+  // List of output PValue(s) for all PCollections processing the PTransform
+  protected List<String> transformOutputs;
+  // Name of the task, for logging purpose
+  protected String task;

Review Comment:
   transient



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -159,12 +182,29 @@ public MessageStream<OpMessage<String>> getDummyStream() {
   }
 
   public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue) {
+    return getMessageStream(pvalue, true);
+  }
+
+  public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(
+      PValue pvalue, boolean enableTransformMetric) {

Review Comment:
   get rid of this flag.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -283,4 +323,13 @@ public StoreIdGenerator getStoreIdGenerator() {
     sendFn.accept(new EndOfStreamMessage(null));
     return dummyInput;
   }
+
+  boolean shouldDoAttachMetricOp(Config config, boolean enableTransformMetric) {

Review Comment:
   remove this.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -159,12 +182,29 @@ public MessageStream<OpMessage<String>> getDummyStream() {
   }
 
   public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue) {
+    return getMessageStream(pvalue, true);
+  }
+
+  public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(
+      PValue pvalue, boolean enableTransformMetric) {
     @SuppressWarnings("unchecked")
     final MessageStream<OpMessage<OutT>> stream =
         (MessageStream<OpMessage<OutT>>) messsageStreams.get(pvalue);
     if (stream == null) {
       throw new IllegalArgumentException("No stream registered for pvalue: " + pvalue);
     }
+
+    // add a step to attach InputMetricOp if registered for Op Stream
+    final Config overrideConfig = new MapConfig(getPipelineOptions().getConfigOverride());

Review Comment:
   Use pipeline options instead of samza config.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaInputMetricOp.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.samza.metrics.BeamTransformMetricRegistry;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SamzaInputMetricOp is a {@link SamzaMetricOp} that emits & maintains default transform metrics
+ * for input PCollection to the transform. It emits the input throughput and maintains avg arrival
+ * time for input PCollection per watermark.
+ *
+ * <p>Assumes that {@code SamzaInputMetricOp#processWatermark(Instant, OpEmitter)} is exclusive of
+ * {@code SamzaInputMetricOp#processElement(Instant, OpEmitter)}. Specifically, the processWatermark
+ * method assumes that no calls to processElement will be made during its execution, and vice versa.
+ *
+ * @param <T> The type of the elements in the input PCollection.
+ */
+public class SamzaInputMetricOp<T> extends SamzaMetricOp<T> {

Review Comment:
   This class looks almost the same as the OutputMetricsOP class. I don't understand why we need three classes here (input, output, abstract). Let's consolidate them into one. You can tell whether it's an input or output by checking the input/output collection size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org