You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/12/06 18:56:44 UTC

[2/2] samza git commit: SAMZA-2002: SamzaSQL Diagnostics: instrument rest of operators (except join & aggregate) and at Query level

SAMZA-2002: SamzaSQL Diagnostics: instrument rest of operators (except join & aggregate) and at Query level

Second phase of instrumenting SamzaSQL operators to add and maintain metrics. All operators, except join and aggregate, are instrumented to add Processing Time and Input Rate metrics. Whenever output rate could be different (e.g., filter operator) the output rate is also added. At query level, we have Query Latency, and input and output rates.

Author: Shenoda Guirguis <sg...@linkedin.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>, Aditya Toomula <at...@linkedin.com>

Closes #831 from shenodaguirguis/addmetrics.3


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bd9387b7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bd9387b7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bd9387b7

Branch: refs/heads/master
Commit: bd9387b7f8a455c733ccbcb2347bdc30be7716dd
Parents: b126683
Author: Shenoda Guirguis <sg...@linkedin.com>
Authored: Thu Dec 6 10:56:38 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Thu Dec 6 10:56:38 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/sql/SamzaSqlRelRecord.java |   7 +-
 .../apache/samza/sql/avro/AvroRelConverter.java |   5 +-
 .../samza/sql/data/SamzaSqlRelMessage.java      |  30 ++++-
 .../samza/sql/data/SamzaSqlRelMsgMetadata.java  | 105 +++++++++++++++++
 .../samza/sql/translator/FilterTranslator.java  |  63 +++++++++--
 .../samza/sql/translator/JoinTranslator.java    |  14 +--
 .../translator/LogicalAggregateTranslator.java  |   4 +-
 .../samza/sql/translator/ProjectTranslator.java |  43 ++++---
 .../samza/sql/translator/QueryTranslator.java   | 113 ++++++++++++++++---
 .../translator/SamzaSqlTableJoinFunction.java   |   2 +-
 .../samza/sql/translator/ScanTranslator.java    |  59 ++++++++--
 .../sql/translator/TranslatorConstants.java     |  32 ++++++
 .../samza/sql/data/TestSamzaSqlRelMessage.java  |  19 ++--
 .../TestSamzaSqlRelMessageSerde.java            |   4 +-
 .../serializers/TestSamzaSqlRelRecordSerde.java |   5 +-
 .../sql/translator/TestFilterTranslator.java    |  56 ++++++---
 .../sql/translator/TestJoinTranslator.java      |  25 ++--
 .../sql/translator/TestProjectTranslator.java   |  34 ++++--
 .../sql/translator/TestQueryTranslator.java     |  39 ++++---
 .../TestSamzaSqlLocalTableJoinFunction.java     |  16 ++-
 .../TestSamzaSqlRemoteTableJoinFunction.java    |   5 +-
 21 files changed, 541 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
index a877e6b..d156470 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
@@ -89,10 +89,9 @@ public class SamzaSqlRelRecord implements Serializable {
    * @return returns the value of the field.
    */
   public Optional<Object> getField(String name) {
-    for (int index = 0; index < fieldNames.size(); index++) {
-      if (fieldNames.get(index).equals(name)) {
-        return Optional.ofNullable(fieldValues.get(index));
-      }
+    int index = fieldNames.indexOf(name);
+    if (index != -1) {
+      return Optional.ofNullable(fieldValues.get(index));
     }
 
     return Optional.empty();

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index ef6c02b..0d452dd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.avro;
 
 import java.nio.ByteBuffer;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.system.SystemStream;
 import org.slf4j.Logger;
@@ -92,7 +94,8 @@ public class AvroRelConverter implements SamzaRelConverter {
       throw new SamzaException(msg);
     }
 
-    return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues);
+    return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues,
+        new SamzaSqlRelMsgMetadata("", "", ""));
   }
 
   public void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames,

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 55ce7b0..13c896a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.data;
 
 import java.io.Serializable;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -45,6 +46,12 @@ public class SamzaSqlRelMessage implements Serializable {
   private final SamzaSqlRelRecord samzaSqlRelRecord;
 
   /**
+   * hold metadata about the message or event, e.g., the eventTime timestamp
+   */
+  @JsonProperty("samzaSqlRelMsgMetadata")
+  private SamzaSqlRelMsgMetadata samzaSqlRelMsgMetadata;
+
+  /**
    * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
    * If the field list contains KEY, then it extracts the key out of the fields to create a
    * {@link SamzaSqlRelRecord} along with key, otherwise creates a {@link SamzaSqlRelRecord}
@@ -53,9 +60,11 @@ public class SamzaSqlRelMessage implements Serializable {
    * @param fieldValues  Ordered list of all the values in the row. Some of the fields can be null, This could be
    *                     result of delete change capture event in the stream or because of the result of the outer join
    *                     or the fields themselves are null in the original stream.
+   * @param metadata the message/event's metadata
    */
-  public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) {
+  public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues, SamzaSqlRelMsgMetadata metadata) {
     Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
+    Validate.notNull(metadata, "Message metadata is NULL");
 
     int keyIndex = fieldNames.indexOf(KEY_NAME);
     Object key = null;
@@ -64,8 +73,10 @@ public class SamzaSqlRelMessage implements Serializable {
     }
     this.key = key;
     this.samzaSqlRelRecord = new SamzaSqlRelRecord(fieldNames, fieldValues);
+    this.samzaSqlRelMsgMetadata = metadata;
   }
 
+
   /**
    * Create the SamzaSqlRelMessage, Each rel message represents a row in the table.
    * So it can contain a key and a list of fields in the row.
@@ -74,9 +85,11 @@ public class SamzaSqlRelMessage implements Serializable {
    * @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be result of
    *               delete change capture event in the stream or because of the result of the outer join or the fields
    *               themselves are null in the original stream.
+   * @param metadata the message/event's metadata
    */
-  public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues) {
+  public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues, SamzaSqlRelMsgMetadata metadata) {
     Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
+    Validate.notNull(metadata, "Message metadata is NULL");
 
     List<String> tmpFieldNames = new ArrayList<>();
     List<Object> tmpFieldValues = new ArrayList<>();
@@ -89,14 +102,17 @@ public class SamzaSqlRelMessage implements Serializable {
     tmpFieldValues.addAll(fieldValues);
 
     this.samzaSqlRelRecord = new SamzaSqlRelRecord(tmpFieldNames, tmpFieldValues);
+    this.samzaSqlRelMsgMetadata = metadata;
   }
 
   /**
    * Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}.
    * @param samzaSqlRelRecord represents the rel record.
+   * @param metadata the message/event's metadata
    */
-  public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord) {
-    this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues());
+  public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord,
+      @JsonProperty("samzaSqlRelMsgMetadata") SamzaSqlRelMsgMetadata metadata) {
+    this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues(), metadata);
   }
 
   @JsonProperty("samzaSqlRelRecord")
@@ -104,6 +120,9 @@ public class SamzaSqlRelMessage implements Serializable {
     return samzaSqlRelRecord;
   }
 
+  @JsonProperty("samzaSqlRelMsgMetadata")
+  public SamzaSqlRelMsgMetadata getSamzaSqlRelMsgMetadata() { return samzaSqlRelMsgMetadata; }
+
   public Object getKey() {
     return key;
   }
@@ -127,7 +146,7 @@ public class SamzaSqlRelMessage implements Serializable {
 
   @Override
   public String toString() {
-    return "RelMessage: {" + samzaSqlRelRecord + "}";
+    return "RelMessage: {" + samzaSqlRelRecord + " " + samzaSqlRelMsgMetadata + "}";
   }
 
   /**
@@ -172,4 +191,5 @@ public class SamzaSqlRelMessage implements Serializable {
     }
     return keyPartNames;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
new file mode 100644
index 0000000..1bdd5f6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data;
+
+import java.io.Serializable;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Metadata of Samza Sql Rel Message. Contains metadata about the corresponding event or
+ * relational row of a table. Used as member of the {@link SamzaSqlRelMessage}.
+ */
+public class SamzaSqlRelMsgMetadata implements Serializable {
+  /**
+   * boolean to indicate whether this message comes from a new input message or not, in case of
+   * Project:flatten() is used, to be able to determine the number of original input messages
+   * default is true for the case when no flatten() is used
+   */
+  public boolean isNewInputMessage = true;
+
+  /**
+   * The timestamp of when the events actually happened
+   * set by and copied from the event source
+   * TODO: copy eventTime through from source to RelMessage
+   */
+  @JsonProperty("eventTime")
+  private String eventTime;
+
+  /**
+   * the timestamp of when Samza App received the event
+   * TODO: set arrivalTime during conversion from IME to SamzaMessage
+   */
+  @JsonProperty("arrivalTime")
+  private String arrivalTime;
+
+  /**
+   * the timestamp when SamzaSQL query starts processing the event
+   * set by the SamzaSQL Scan operator
+   */
+  @JsonProperty("scanTime")
+  private String scanTime;
+
+  public SamzaSqlRelMsgMetadata(@JsonProperty("eventTime") String eventTime, @JsonProperty("arrivalTime") String arrivalTime,
+      @JsonProperty("scanTime") String scanTime) {
+    this.eventTime = eventTime;
+    this.arrivalTime = arrivalTime;
+    this.scanTime = scanTime;
+  }
+
+  public SamzaSqlRelMsgMetadata(String eventTime, String arrivalTime, String scanTime, boolean isNewInputMessage) {
+    this(eventTime, arrivalTime, scanTime);
+    this.isNewInputMessage = isNewInputMessage;
+  }
+
+  @JsonProperty("eventTime")
+  public String getEventTime() { return eventTime;}
+
+  public void setEventTime(String eventTime) {
+    this.eventTime = eventTime;
+  }
+
+  public boolean hasEventTime() { return eventTime != null && !eventTime.isEmpty(); }
+
+  @JsonProperty("arrivalTime")
+  public String getarrivalTime() { return arrivalTime;}
+
+  public void setArrivalTime(String arrivalTime) {
+    this.arrivalTime = arrivalTime;
+  }
+
+  public boolean hasArrivalTime() { return arrivalTime != null && !arrivalTime.isEmpty(); }
+
+
+  @JsonProperty("scanTime")
+  public String getscanTime() { return scanTime;}
+
+  public void setScanTime(String scanTime) {
+    this.scanTime = scanTime;
+  }
+
+  public boolean hasScanTime() { return scanTime != null && !scanTime.isEmpty(); }
+
+  @Override
+  public String toString() {
+    return "[Metadata:{" + eventTime + " " + arrivalTime + " " + scanTime + "}]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 3cbcc48..911024a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -19,10 +19,17 @@
 
 package org.apache.samza.sql.translator;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.sql.data.Expression;
@@ -45,49 +52,87 @@ class FilterTranslator {
     this.queryId = queryId;
   }
 
+  /**
+   * FilterTranslatorFunction to process input events, apply the filter and produce output
+   * events accordingly
+   */
   private static class FilterTranslatorFunction implements FilterFunction<SamzaSqlRelMessage> {
     private transient Expression expr;
-    private transient TranslatorContext context;
+    private transient TranslatorContext translatorContext;
     private transient LogicalFilter filter;
-    private final int queryId;
+    private transient MetricsRegistry metricsRegistry;
+    private transient SamzaHistogram processingTime; // milli-seconds
+    private transient Counter inputEvents;
+    private transient Counter filteredOutEvents;
+    private transient Counter outputEvents;
 
+    private final int queryId;
     private final int filterId;
+    private final String logicalOpId;
 
-    FilterTranslatorFunction(int filterId, int queryId) {
+    FilterTranslatorFunction(int filterId, int queryId, String logicalOpId) {
       this.filterId = filterId;
       this.queryId = queryId;
+      this.logicalOpId = logicalOpId;
     }
 
     @Override
     public void init(Context context) {
-      this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
-      this.filter = (LogicalFilter) this.context.getRelNode(filterId);
-      this.expr = this.context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
+      this.translatorContext = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+      this.filter = (LogicalFilter) this.translatorContext.getRelNode(filterId);
+      this.expr = this.translatorContext.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
+      ContainerContext containerContext = context.getContainerContext();
+      metricsRegistry = containerContext.getContainerMetricsRegistry();
+      processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, TranslatorConstants.PROCESSING_TIME_NAME);
+      inputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.INPUT_EVENTS_NAME);
+      inputEvents.clear();
+      filteredOutEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.FILTERED_EVENTS_NAME);
+      filteredOutEvents.clear();
+      outputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.OUTPUT_EVENTS_NAME);
+      outputEvents.clear();
     }
 
     @Override
     public boolean apply(SamzaSqlRelMessage message) {
+      Instant startProcessing = Instant.now();
       Object[] result = new Object[1];
-      expr.execute(context.getExecutionContext(), context.getDataContext(),
+      expr.execute(translatorContext.getExecutionContext(), translatorContext.getDataContext(),
           message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
       if (result.length > 0 && result[0] instanceof Boolean) {
         boolean retVal = (Boolean) result[0];
         log.debug(
             String.format("return value for input %s is %s",
                 Arrays.asList(message.getSamzaSqlRelRecord().getFieldValues()).toString(), retVal));
+        updateMetrics(startProcessing, retVal, Instant.now());
         return retVal;
       } else {
         log.error("return value is not boolean");
         return false;
       }
     }
+
+    /**
+     * Updates the MetricsRegistery of this operator
+     * @param startProcessing = begin processing of the message
+     * @param endProcessing = end of processing
+     */
+    private void updateMetrics(Instant startProcessing, boolean isOutput, Instant endProcessing) {
+      inputEvents.inc();
+      if (isOutput) {
+        outputEvents.inc();
+      } else {
+        filteredOutEvents.inc();
+      }
+      processingTime.update(Duration.between(startProcessing, endProcessing).toMillis());
+    }
+
   }
 
-  void translate(final LogicalFilter filter, final TranslatorContext context) {
+  void translate(final LogicalFilter filter, final String logicalOpId, final TranslatorContext context) {
     MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(filter.getInput().getId());
     final int filterId = filter.getId();
 
-    MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(new FilterTranslatorFunction(filterId, queryId));
+    MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(new FilterTranslatorFunction(filterId, queryId, logicalOpId));
 
     context.registerMessageStream(filterId, outputStream);
     context.registerRelNode(filterId, filter);

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 7b8ff58..3a858f8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -84,9 +84,9 @@ class JoinTranslator {
     this.queryId = queryId;
   }
 
-  void translate(final LogicalJoin join, final TranslatorContext context) {
-    JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), context);
-    JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), context);
+  void translate(final LogicalJoin join, final TranslatorContext translatorContext) {
+    JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), translatorContext);
+    JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), translatorContext);
 
     // Do the validation of join query
     validateJoinQuery(join, inputTypeOnLeft, inputTypeOnRight);
@@ -109,13 +109,13 @@ class JoinTranslator {
     JoinInputNode tableNode = new JoinInputNode(isTablePosOnRight ? join.getRight() : join.getLeft(), tableKeyIds,
         isTablePosOnRight ? inputTypeOnRight : inputTypeOnLeft, isTablePosOnRight);
 
-    MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(streamNode.getRelNode().getId());
-    Table table = getTable(tableNode, context);
+    MessageStream<SamzaSqlRelMessage> inputStream = translatorContext.getMessageStream(streamNode.getRelNode().getId());
+    Table table = getTable(tableNode, translatorContext);
 
     MessageStream<SamzaSqlRelMessage> outputStream =
-        joinStreamWithTable(inputStream, table, streamNode, tableNode, join, context);
+        joinStreamWithTable(inputStream, table, streamNode, tableNode, join, translatorContext);
 
-    context.registerMessageStream(join.getId(), outputStream);
+    translatorContext.registerMessageStream(join.getId(), outputStream);
   }
 
   private MessageStream<SamzaSqlRelMessage> joinStreamWithTable(MessageStream<SamzaSqlRelMessage> inputStream,

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
index ac3d4e9..357d7e6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.translator;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.rel.logical.LogicalAggregate;
@@ -32,6 +33,7 @@ import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.LongSerde;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,7 +80,7 @@ class LogicalAggregateTranslator {
                 List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
                 fieldNames.add(aggFieldNames.get(0));
                 fieldValues.add(windowPane.getMessage());
-                return new SamzaSqlRelMessage(fieldNames, fieldValues);
+                return new SamzaSqlRelMessage(fieldNames, fieldValues, new SamzaSqlRelMsgMetadata("", "", ""));
               });
     context.registerMessageStream(aggregate.getId(), outputStream);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 990d7db..3378788 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.sql.translator;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -43,6 +42,7 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
 class ProjectTranslator {
 
   private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class);
+  //private transient int messageIndex = 0;
   private final int queryId;
   ProjectTranslator(int queryId) {
     this.queryId = queryId;
@@ -64,20 +65,18 @@ class ProjectTranslator {
    * ProjectMapFunction implements MapFunction to map input SamzaSqlRelMessages, one at a time, to a new
    * SamzaSqlRelMessage which consists of the projected fields
    */
-  @VisibleForTesting
-  public static class ProjectMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+  private static class ProjectMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
     private transient Project project;
     private transient Expression expr;
     private transient TranslatorContext translatorContext;
     private transient MetricsRegistry metricsRegistry;
     private transient SamzaHistogram processingTime; // milli-seconds
-    private transient Counter numEvents;
+    private transient Counter inputEvents;
+    private transient Counter outputEvents;
 
     private final int queryId;
     private final int projectId;
     private final String logicalOpId;
-    private final String PROCESSING_TIME_NAME = "processingTime";
-    private final String NUM_EVENTS_NAME = "numEvents";
 
     ProjectMapFunction(int projectId, int queryId, String logicalOpId) {
       this.projectId = projectId;
@@ -96,9 +95,11 @@ class ProjectTranslator {
       this.expr = this.translatorContext.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
       ContainerContext containerContext = context.getContainerContext();
       metricsRegistry = containerContext.getContainerMetricsRegistry();
-      processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, PROCESSING_TIME_NAME);
-      numEvents = metricsRegistry.newCounter(logicalOpId, NUM_EVENTS_NAME);
-      numEvents.clear();
+      processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, TranslatorConstants.PROCESSING_TIME_NAME);
+      inputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.INPUT_EVENTS_NAME);
+      inputEvents.clear();
+      outputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.OUTPUT_EVENTS_NAME);
+      outputEvents.clear();
     }
 
     /**
@@ -117,18 +118,22 @@ class ProjectTranslator {
       for (int index = 0; index < output.length; index++) {
         names.add(index, project.getNamedProjects().get(index).getValue());
       }
-      updateMetrics(arrivalTime, Instant.now());
-      return new SamzaSqlRelMessage(names, Arrays.asList(output));
+      updateMetrics(arrivalTime, Instant.now(), message.getSamzaSqlRelMsgMetadata().isNewInputMessage);
+      return new SamzaSqlRelMessage(names, Arrays.asList(output), message.getSamzaSqlRelMsgMetadata());
     }
 
     /**
      * Updates the Diagnostics Metrics (processing time and number of events)
      * @param arrivalTime input message arrival time (= beging of processing in this operator)
      * @param outputTime output message output time (=end of processing in this operator)
+     * @param isNewInputMessage whether the input Message is from new input message or not
      */
-    private void updateMetrics(Instant arrivalTime, Instant outputTime) {
-      numEvents.inc();
-      processingTime.update(Duration.between(arrivalTime, outputTime).toNanos() / 1000L);
+    private void updateMetrics(Instant arrivalTime, Instant outputTime, boolean isNewInputMessage) {
+      if (isNewInputMessage) {
+        inputEvents.inc();
+      }
+      outputEvents.inc();
+      processingTime.update(Duration.between(arrivalTime, outputTime).toMillis());
     }
 
   }
@@ -137,16 +142,22 @@ class ProjectTranslator {
       MessageStream<SamzaSqlRelMessage> inputStream) {
     return inputStream.flatMap(message -> {
       Object field = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
-
       if (field != null && field instanceof List) {
         List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
+        SamzaSqlRelMsgMetadata messageMetadata = message.getSamzaSqlRelMsgMetadata();
+        SamzaSqlRelMsgMetadata newMetadata = new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(),
+            messageMetadata.getarrivalTime(), messageMetadata.getscanTime(), true);
         for (Object fieldValue : (List) field) {
           List<Object> newValues = new ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
           newValues.set(flattenIndex, Collections.singletonList(fieldValue));
-          outMessages.add(new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues));
+          outMessages.add(new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues,
+              newMetadata));
+          newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(), newMetadata.getarrivalTime(),
+              newMetadata.getscanTime(), false);
         }
         return outMessages;
       } else {
+        message.getSamzaSqlRelMsgMetadata().isNewInputMessage = true;
         return Collections.singletonList(message);
       }
     });

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 12e83c3..13d0a25 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -20,6 +20,8 @@
 package org.apache.samza.sql.translator;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -42,6 +44,9 @@ import org.apache.samza.context.Context;
 import org.apache.samza.context.ExternalContext;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.context.TaskContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -50,6 +55,7 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.planner.QueryPlanner;
@@ -60,6 +66,8 @@ import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.system.descriptors.GenericOutputDescriptor;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.TableDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -73,15 +81,40 @@ public class QueryTranslator {
   private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
   private final Map<String, MessageStream<KV<Object, Object>>> inputMsgStreams;
   private final Map<String, OutputStream> outputMsgStreams;
+  private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
+  static int opId = 0;
 
+  /**
+   * map function used by SendToOutputStram to convert SamzaRelMessage to KV
+   * it also maintains SendTo and most Query metrics
+   */
   private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
     private transient SamzaRelConverter samzaMsgConverter;
+    private transient MetricsRegistry metricsRegistry;
+    /**
+     * TODO: [SAMZA-2031]: the time-based metrics here for insert and query are
+     * currently not accurate because they don't include the time of sendTo() call
+     * It is not feasible to include it because sendTo operator does not return
+     * a stream to process its messages to update hte metrics.
+     */
+    /* insert (SendToOutputStream) metrics */
+    private transient SamzaHistogram insertProcessingTime;
+    /* query metrics */
+    private transient SamzaHistogram totalLatency; // (if event time exists) = output time - event time (msec)
+    private transient SamzaHistogram queryLatency; // = output time - scan time (msec)
+    private transient SamzaHistogram queueingLatency; // = scan time - arrival time (msec)
+    private transient Counter queryOutputEvents;
+
     private final String outputTopic;
     private final int queryId;
+    private String queryLogicalId;
+    private String insertLogicalId;
 
-    OutputMapFunction(String outputTopic, int queryId) {
+    OutputMapFunction(String queryLogicalId, String insertLogicalId, String outputTopic, int queryId) {
       this.outputTopic = outputTopic;
       this.queryId = queryId;
+      this.queryLogicalId = queryLogicalId;
+      this.insertLogicalId = insertLogicalId;
     }
 
     @Override
@@ -89,13 +122,56 @@ public class QueryTranslator {
       TranslatorContext translatorContext =
           ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
       this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic);
+      ContainerContext containerContext = context.getContainerContext();
+      metricsRegistry = containerContext.getContainerMetricsRegistry();
+      /* insert (SendToOutputStream) metrics */
+      insertProcessingTime = new SamzaHistogram(metricsRegistry, insertLogicalId, TranslatorConstants.TOTAL_LATENCY_NAME);;
+      /* query metrics */
+      totalLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, TranslatorConstants.TOTAL_LATENCY_NAME);;
+      queryLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, TranslatorConstants.QUERY_LATENCY_NAME);
+      queueingLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, TranslatorConstants.QUEUEING_LATENCY_NAME);;
+      queryOutputEvents = metricsRegistry.newCounter(queryLogicalId, TranslatorConstants.OUTPUT_EVENTS_NAME);
+      queryOutputEvents.clear();
     }
 
     @Override
     public KV<Object, Object> apply(SamzaSqlRelMessage message) {
-      return this.samzaMsgConverter.convertToSamzaMessage(message);
+      Instant beginProcessing = Instant.now();
+      KV<Object, Object> retKV = this.samzaMsgConverter.convertToSamzaMessage(message);
+      updateMetrics(beginProcessing, Instant.now(), message.getSamzaSqlRelMsgMetadata());
+      return  retKV;
     }
-  }
+
+    /**
+     * Updates the Diagnostics Metrics (processing time and number of events)
+     * @param beginProcessing when sendOutput Started processing this message
+     * @param endProcessing when sendOutput finished processing this message
+     * @param metadata the event's message metadata
+     */
+    private void updateMetrics(Instant beginProcessing, Instant endProcessing, SamzaSqlRelMsgMetadata metadata) {
+      /* insert (SendToOutputStream) metrics */
+      insertProcessingTime.update(Duration.between(beginProcessing, endProcessing).toMillis());
+      /* query metrics */
+      Instant outputTime = Instant.now();
+      queryOutputEvents.inc();
+      /* TODO: remove scanTime validation once code to assign it is stable */
+      Validate.isTrue(metadata.hasScanTime());
+      Instant scanTime = Instant.parse(metadata.getscanTime());
+      queryLatency.update(Duration.between(scanTime, outputTime).toMillis());
+      /** TODO: change if hasArrivalTime to validation once arrivalTime is assigned,
+                and later remove the check once code is stable */
+      if (metadata.hasArrivalTime()) {
+        Instant arrivalTime = Instant.parse(metadata.getarrivalTime());
+        queueingLatency.update(Duration.between(arrivalTime, scanTime).toMillis());
+      }
+      /* since availability of eventTime depends on source, we need the following check */
+      if (metadata.hasEventTime()) {
+        Instant eventTime = Instant.parse(metadata.getEventTime());
+        totalLatency.update(Duration.between(eventTime, outputTime).toMillis());
+      }
+
+    }
+  } // OutputMapFunction
 
   public QueryTranslator(StreamApplicationDescriptor appDesc, SamzaSqlApplicationConfig sqlConfig) {
     this.sqlConfig = sqlConfig;
@@ -139,14 +215,16 @@ public class QueryTranslator {
    */
   public void translate(RelRoot relRoot, String outputSystemStream, TranslatorContext translatorContext, int queryId) {
     final RelNode node = relRoot.project();
-
     ScanTranslator scanTranslator =
         new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource(), queryId);
 
+    /* update input metrics */
+    String queryLogicalId = String.format(TranslatorConstants.LOGSQLID_TEMPLATE, queryId);
+    //new InputMetricsMapFunction(queryLogicalId));
+
+    opId = 0;
+
     node.accept(new RelShuttleImpl() {
-      int windowId = 0;
-      int joinId = 0;
-      int opId = 0;
 
       @Override
       public RelNode visit(RelNode relNode) {
@@ -158,21 +236,23 @@ public class QueryTranslator {
       @Override
       public RelNode visit(TableScan scan) {
         RelNode node = super.visit(scan);
-        scanTranslator.translate(scan, translatorContext, systemDescriptors, inputMsgStreams);
+        String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "scan", opId++);
+        scanTranslator.translate(scan, queryLogicalId, logicalOpId, translatorContext, systemDescriptors, inputMsgStreams);
         return node;
       }
 
       @Override
       public RelNode visit(LogicalFilter filter) {
         RelNode node = visitChild(filter, 0, filter.getInput());
-        new FilterTranslator(queryId).translate(filter, translatorContext);
+        String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "filter", opId++);
+        new FilterTranslator(queryId).translate(filter, logicalOpId, translatorContext);
         return node;
       }
 
       @Override
       public RelNode visit(LogicalProject project) {
         RelNode node = super.visit(project);
-        String logicalOpId = "sql" + Integer.toString(queryId) + "_project" + Integer.toString(opId++);
+        String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "project", opId++);
         new ProjectTranslator(queryId).translate(project, logicalOpId, translatorContext);
         return node;
       }
@@ -180,7 +260,7 @@ public class QueryTranslator {
       @Override
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
-        String logicalOpId = "sql" + Integer.toString(queryId) + "_join" + Integer.toString(opId++);
+        String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "join", opId++);
         new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(), queryId)
             .translate(join, translatorContext);
         return node;
@@ -189,20 +269,21 @@ public class QueryTranslator {
       @Override
       public RelNode visit(LogicalAggregate aggregate) {
         RelNode node = super.visit(aggregate);
-        String logicalOpId = "sql" + Integer.toString(queryId) + "_window" + Integer.toString(opId++);
+        String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "window", opId++);
         new LogicalAggregateTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix())
             .translate(aggregate, translatorContext);
         return node;
       }
     });
 
-    sendToOutputStream(outputSystemStream, streamAppDescriptor, translatorContext, node, queryId);
+    String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "insert", opId);
+    sendToOutputStream(queryLogicalId, logicalOpId, outputSystemStream, streamAppDescriptor, translatorContext, node, queryId);
   }
 
-  private void sendToOutputStream(String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, int queryId) {
+  private void sendToOutputStream(String queryLogicalId, String logicalOpId, String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext translatorContext, RelNode node, int queryId) {
     SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream);
-    MessageStream<SamzaSqlRelMessage> stream = context.getMessageStream(node.getId());
-    MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sinkStream, queryId));
+    MessageStream<SamzaSqlRelMessage> stream = translatorContext.getMessageStream(node.getId());
+    MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(queryLogicalId, logicalOpId, sinkStream, queryId));
     Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
     if (!tableDescriptor.isPresent()) {
       KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
index b80f89c..0715af3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -105,7 +105,7 @@ public abstract class SamzaSqlTableJoinFunction<K, R>
       outFieldValues.addAll(message.getSamzaSqlRelRecord().getFieldValues());
     }
 
-    return new SamzaSqlRelMessage(outFieldNames, outFieldValues);
+    return new SamzaSqlRelMessage(outFieldNames, outFieldValues, message.getSamzaSqlRelMsgMetadata());
   }
 
   protected abstract List<Object> getTableRelRecordFieldValues(R record);

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index e564cae..aa73f94 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -19,17 +19,21 @@
 
 package org.apache.samza.sql.translator;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -37,6 +41,8 @@ import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 
 
@@ -80,17 +86,29 @@ class ScanTranslator {
     this.queryId = queryId;
   }
 
+  /**
+   * ScanMapFUnction implements MapFunction to process input SamzaSqlRelMessages into output
+   * SamzaSqlRelMessage, performing the table scan
+   */
   private static class ScanMapFunction implements MapFunction<KV<Object, Object>, SamzaSqlRelMessage> {
     // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
     // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
     // initialization.
     private transient SamzaRelConverter msgConverter;
+    private transient MetricsRegistry metricsRegistry;
+    private transient SamzaHistogram processingTime; // milli-seconds
+    private transient Counter queryInputEvents;
+
     private final String streamName;
     private final int queryId;
+    private final String queryLogicalId;
+    private final String logicalOpId;
 
-    ScanMapFunction(String sourceStreamName, int queryId) {
+    ScanMapFunction(String sourceStreamName, int queryId, String queryLogicalId, String logicalOpId) {
       this.streamName = sourceStreamName;
       this.queryId = queryId;
+      this.queryLogicalId = queryLogicalId;
+      this.logicalOpId = logicalOpId;
     }
 
     @Override
@@ -98,15 +116,35 @@ class ScanTranslator {
       TranslatorContext translatorContext =
           ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
       this.msgConverter = translatorContext.getMsgConverter(streamName);
+      ContainerContext containerContext = context.getContainerContext();
+      metricsRegistry = containerContext.getContainerMetricsRegistry();
+      processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, TranslatorConstants.PROCESSING_TIME_NAME);
+      queryInputEvents = metricsRegistry.newCounter(queryLogicalId, TranslatorConstants.INPUT_EVENTS_NAME);
+      queryInputEvents.clear();
     }
 
     @Override
     public SamzaSqlRelMessage apply(KV<Object, Object> message) {
-      return this.msgConverter.convertToRelMessage(message);
+      Instant startProcessing = Instant.now();
+      SamzaSqlRelMessage retMsg = this.msgConverter.convertToRelMessage(message);
+      retMsg.getSamzaSqlRelMsgMetadata().setScanTime(startProcessing.toString());
+      updateMetrics(startProcessing, Instant.now());
+      return retMsg;
     }
-  }
 
-  void translate(final TableScan tableScan, final TranslatorContext context,
+    /**
+     * Updates the MetricsRegistery of this operator
+     * @param startProcessing = begin processing of the message
+     * @param endProcessing = end of processing
+     */
+    private void updateMetrics(Instant startProcessing, Instant endProcessing) {
+      queryInputEvents.inc();
+      processingTime.update(Duration.between(startProcessing, endProcessing).toMillis());
+    }
+
+  } // ScanMapFunction
+
+  void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, final TranslatorContext context,
       Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<KV<Object, Object>>> inputMsgStreams) {
     StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
@@ -136,10 +174,11 @@ class ScanTranslator {
 
     MessageStream<KV<Object, Object>> inputStream =
         inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
-    MessageStream<KV<Object, Object>> outputStream =
-        inputStream.filter(new FilterSystemMessageFunction(sourceName, queryId));
     MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
-        outputStream.map(new ScanMapFunction(sourceName, queryId));
+        inputStream
+            .filter(new FilterSystemMessageFunction(sourceName, queryId))
+            .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));
+
     context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorConstants.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorConstants.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorConstants.java
new file mode 100644
index 0000000..c2f9ea5
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+public class TranslatorConstants {
+  public static final String PROCESSING_TIME_NAME = "processingTimeMs";
+  public static final String TOTAL_LATENCY_NAME = "totalLatencyMs";
+  public static final String QUERY_LATENCY_NAME = "queryLatencyMs";
+  public static final String QUEUEING_LATENCY_NAME = "queueingLatencyMs";
+  public static final String INPUT_EVENTS_NAME = "inputEvents";
+  public static final String FILTERED_EVENTS_NAME = "filteredEvents";
+  public static final String OUTPUT_EVENTS_NAME = "outputEvents";
+  public static final String LOGOPID_TEMPLATE = "sql_%d_%s_%d";
+  public static final String LOGSQLID_TEMPLATE = "sql_%d";
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
index a4f3afc..dffb77e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.data;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -35,31 +36,33 @@ public class TestSamzaSqlRelMessage {
 
   @Test
   public void testGetField() {
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", ""));
     Assert.assertEquals(values.get(0), message.getSamzaSqlRelRecord().getField(names.get(0)).get());
     Assert.assertEquals(values.get(1), message.getSamzaSqlRelRecord().getField(names.get(1)).get());
   }
 
   @Test
   public void testGetNonExistentField() {
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", ""));
     Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent());
   }
 
   @Test
   public void testEquality() {
-    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", ""));
     SamzaSqlRelMessage message2 =
-        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value1", "value2"));
+        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value1", "value2"),
+            new SamzaSqlRelMsgMetadata("", "", ""));
     Assert.assertEquals(message1, message2);
     Assert.assertEquals(message1.hashCode(), message2.hashCode());
   }
 
   @Test
   public void testInEquality() {
-    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", ""));
     SamzaSqlRelMessage message2 =
-        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value2", "value2"));
+        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value2", "value2"),
+            new SamzaSqlRelMsgMetadata("", "", ""));
     Assert.assertNotEquals(message1, message2);
     Assert.assertNotEquals(message1.hashCode(), message2.hashCode());
   }
@@ -67,7 +70,7 @@ public class TestSamzaSqlRelMessage {
   @Test
   public void testCompositeKeyCreation() {
     List<String> keyPartNames = Arrays.asList("kfield1", "kfield2");
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", ""));
 
     SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Collections.singletonList(0));
     Assert.assertEquals(relRecord1.getFieldNames().size(), 1);
@@ -86,7 +89,7 @@ public class TestSamzaSqlRelMessage {
   @Test (expected = IllegalArgumentException.class)
   public void testCompositeKeyCreationWithInEqualKeyNameValues() {
     List<String> keyPartNames = Arrays.asList("kfield1", "kfield2");
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", ""));
 
     SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Arrays.asList(1, 0),
         SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(keyPartNames, Arrays.asList(1)));

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
index a159e2f..5afa4fe 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.serializers;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.samza.sql.avro.schemas.AddressRecord;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.StreetNumRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
@@ -51,7 +53,7 @@ public class TestSamzaSqlRelMessageSerde {
 
   @Test
   public void testWithDifferentFields() {
-    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", ""));
     SamzaSqlRelMessageSerde serde =
         (SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
     SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message));

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
index d264f01..4e6ff4f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.serializers;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.samza.sql.avro.AvroRelSchemaProvider;
 import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
 import org.junit.Test;
@@ -48,7 +50,8 @@ public class TestSamzaSqlRelRecordSerde {
 
   @Test
   public void testWithDifferentFields() {
-    SamzaSqlRelRecord record = new SamzaSqlRelMessage(names, values).getSamzaSqlRelRecord();
+    SamzaSqlRelRecord record =
+        new SamzaSqlRelMessage(names, values, new SamzaSqlRelMsgMetadata("", "", "")).getSamzaSqlRelRecord();
     SamzaSqlRelRecordSerde serde =
         (SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
     SamzaSqlRelRecord resultRecord = serde.fromBytes(serde.toBytes(record));

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index a652885..e804113 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.translator;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -27,6 +28,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
@@ -37,7 +39,9 @@ import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -63,12 +67,17 @@ import static org.mockito.Mockito.when;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(LogicalFilter.class)
 public class TestFilterTranslator extends TranslatorTestBase {
+  final private String LOGICAL_OP_ID = "sql0_filter_0";
+
 
   @Test
   public void testTranslate() throws IOException, ClassNotFoundException {
     // setup mock values to the constructor of FilterTranslator
     LogicalFilter mockFilter = PowerMockito.mock(LogicalFilter.class);
-    TranslatorContext mockContext = mock(TranslatorContext.class);
+    Context mockContext = mock(Context.class);
+    ContainerContext mockContainerContext = mock(ContainerContext.class);
+    TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
+    TestMetricsRegistryImpl metricsRegistry = new TestMetricsRegistryImpl();
     RelNode mockInput = mock(RelNode.class);
     when(mockFilter.getInput()).thenReturn(mockInput);
     when(mockInput.getId()).thenReturn(1);
@@ -76,43 +85,52 @@ public class TestFilterTranslator extends TranslatorTestBase {
     StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
     MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockGraph, mockInputOp);
-    when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
-    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+    when(mockTranslatorContext.getMessageStream(eq(1))).thenReturn(mockStream);
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(2), any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
-    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
     Expression mockExpr = mock(Expression.class);
     when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+    when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+    when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
 
     // Apply translate() method to verify that we are getting the correct filter operator constructed
     FilterTranslator filterTranslator = new FilterTranslator(1);
-    filterTranslator.translate(mockFilter, mockContext);
+    filterTranslator.translate(mockFilter, LOGICAL_OP_ID, mockTranslatorContext);
     // make sure that context has been registered with LogicFilter and output message streams
-    verify(mockContext, times(1)).registerRelNode(2, mockFilter);
-    verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
-    when(mockContext.getRelNode(2)).thenReturn(mockFilter);
-    when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+    verify(mockTranslatorContext, times(1)).registerRelNode(2, mockFilter);
+    verify(mockTranslatorContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+    when(mockTranslatorContext.getRelNode(2)).thenReturn(mockFilter);
+    when(mockTranslatorContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
     StreamOperatorSpec filterSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
     assertNotNull(filterSpec);
     assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER);
 
     // Verify that the describe() method will establish the context for the filter function
-    Context context = mock(Context.class);
     Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
-    mockContexts.put(1, mockContext);
-    when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
-    filterSpec.getTransformFn().init(context);
+    mockContexts.put(1, mockTranslatorContext);
+    when(mockContext.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
+    filterSpec.getTransformFn().init(mockContext);
     FilterFunction filterFn = (FilterFunction) Whitebox.getInternalState(filterSpec, "filterFn");
     assertNotNull(filterFn);
-    assertEquals(mockContext, Whitebox.getInternalState(filterFn, "context"));
+    assertEquals(mockTranslatorContext, Whitebox.getInternalState(filterFn, "translatorContext"));
     assertEquals(mockFilter, Whitebox.getInternalState(filterFn, "filter"));
     assertEquals(mockExpr, Whitebox.getInternalState(filterFn, "expr"));
+    // Verify MetricsRegistry works with Project
+    assertEquals(1, metricsRegistry.getGauges().size());
+    assertTrue(metricsRegistry.getGauges().get(LOGICAL_OP_ID).size() > 0);
+    assertEquals(1, metricsRegistry.getCounters().size());
+    assertEquals(3, metricsRegistry.getCounters().get(LOGICAL_OP_ID).size());
+    assertEquals(0, metricsRegistry.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+    assertEquals(0, metricsRegistry.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
 
     // Calling filterFn.apply() to verify the filter function is correctly applied to the input message
-    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>(),
+        new SamzaSqlRelMsgMetadata("", "", ""));
     SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
     DataContext dataContext = mock(DataContext.class);
-    when(mockContext.getExecutionContext()).thenReturn(executionContext);
-    when(mockContext.getDataContext()).thenReturn(dataContext);
+    when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
+    when(mockTranslatorContext.getDataContext()).thenReturn(dataContext);
     Object[] result = new Object[1];
 
     doAnswer( invocation -> {
@@ -131,6 +149,10 @@ public class TestFilterTranslator extends TranslatorTestBase {
         eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), eq(result));
     assertFalse(filterFn.apply(mockInputMsg));
 
+    // Verify filterFn.apply() updates the MetricsRegistry metrics
+    assertEquals(2, metricsRegistry.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+    assertEquals(1, metricsRegistry.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 85fe580..090fb90 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -92,7 +92,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
   private void testTranslateStreamToTableJoin(boolean isRemoteTable) throws IOException, ClassNotFoundException {
     // setup mock values to the constructor of JoinTranslator
     LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
-    TranslatorContext mockContext = mock(TranslatorContext.class);
+    TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
     RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
     RelNode mockRightInput = mock(RelNode.class);
     List<RelNode> inputs = new ArrayList<>();
@@ -148,20 +148,20 @@ public class TestJoinTranslator extends TranslatorTestBase {
     StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockLeftInputOp = mock(OperatorSpec.class);
     MessageStream<SamzaSqlRelMessage> mockLeftInputStream = new MessageStreamImpl<>(mockAppDesc, mockLeftInputOp);
-    when(mockContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream);
+    when(mockTranslatorContext.getMessageStream(eq(mockLeftInput.getId()))).thenReturn(mockLeftInputStream);
     OperatorSpec<Object, SamzaSqlRelMessage> mockRightInputOp = mock(OperatorSpec.class);
     MessageStream<SamzaSqlRelMessage> mockRightInputStream = new MessageStreamImpl<>(mockAppDesc, mockRightInputOp);
-    when(mockContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream);
-    when(mockContext.getStreamAppDescriptor()).thenReturn(mockAppDesc);
+    when(mockTranslatorContext.getMessageStream(eq(mockRightInput.getId()))).thenReturn(mockRightInputStream);
+    when(mockTranslatorContext.getStreamAppDescriptor()).thenReturn(mockAppDesc);
 
     InputOperatorSpec mockInputOp = mock(InputOperatorSpec.class);
     OutputStreamImpl mockOutputStream = mock(OutputStreamImpl.class);
     when(mockInputOp.isKeyed()).thenReturn(true);
     when(mockOutputStream.isKeyed()).thenReturn(true);
 
-    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class));
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(3), any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
-    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
     Expression mockExpr = mock(Expression.class);
     when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
 
@@ -176,7 +176,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
     when(mockJoin.getJoinType()).thenReturn(JoinRelType.INNER);
 
     SamzaSqlExecutionContext mockExecutionContext = mock(SamzaSqlExecutionContext.class);
-    when(mockContext.getExecutionContext()).thenReturn(mockExecutionContext);
+    when(mockTranslatorContext.getExecutionContext()).thenReturn(mockExecutionContext);
 
     SamzaSqlApplicationConfig mockAppConfig = mock(SamzaSqlApplicationConfig.class);
     when(mockExecutionContext.getSamzaSqlApplicationConfig()).thenReturn(mockAppConfig);
@@ -195,12 +195,13 @@ public class TestJoinTranslator extends TranslatorTestBase {
     when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
 
     // Apply translate() method to verify that we are getting the correct map operator constructed
-    JoinTranslator joinTranslator = new JoinTranslator("sql0_join3", "", 0);
-    joinTranslator.translate(mockJoin, mockContext);
+    String logicalOpId = "sql0_join3";
+    JoinTranslator joinTranslator = new JoinTranslator(logicalOpId, "", 0);
+    joinTranslator.translate(mockJoin, mockTranslatorContext);
     // make sure that context has been registered with LogicFilter and output message streams
-    verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
-    when(mockContext.getRelNode(3)).thenReturn(mockJoin);
-    when(mockContext.getMessageStream(3)).thenReturn(this.getRegisteredMessageStream(3));
+    verify(mockTranslatorContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
+    when(mockTranslatorContext.getRelNode(3)).thenReturn(mockJoin);
+    when(mockTranslatorContext.getMessageStream(3)).thenReturn(this.getRegisteredMessageStream(3));
     StreamTableJoinOperatorSpec
         joinSpec = (StreamTableJoinOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(3), "operatorSpec");
     assertNotNull(joinSpec);

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index b6b2ddd..29ce6d0 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -19,6 +19,7 @@
 package org.apache.samza.sql.translator;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -47,6 +48,7 @@ import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
 import org.apache.samza.util.NoOpMetricsRegistry;
@@ -135,11 +137,13 @@ public class TestProjectTranslator extends TranslatorTestBase {
     assertEquals(1, testMetricsRegistryImpl.getGauges().size());
     assertEquals(2, testMetricsRegistryImpl.getGauges().get(LOGICAL_OP_ID).size());
     assertEquals(1, testMetricsRegistryImpl.getCounters().size());
-    assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).size());
+    assertEquals(2, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).size());
     assertEquals(0, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+    assertEquals(0, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
 
     // Calling mapFn.apply() to verify the filter function is correctly applied to the input message
-    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>(),
+        new SamzaSqlRelMsgMetadata("", "", ""));
     SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
     DataContext dataContext = mock(DataContext.class);
     when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
@@ -164,6 +168,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
 
     // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
     assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+    assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
 
   }
 
@@ -174,7 +179,8 @@ public class TestProjectTranslator extends TranslatorTestBase {
     TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
     Context mockContext = mock(Context.class);
     ContainerContext mockContainerContext = mock(ContainerContext.class);
-    NoOpMetricsRegistry noOpMetricsRegistry = new NoOpMetricsRegistry();
+    TestMetricsRegistryImpl testMetricsRegistryImpl = new TestMetricsRegistryImpl();
+
     RelNode mockInput = mock(RelNode.class);
     List<RelNode> inputs = new ArrayList<>();
     inputs.add(mockInput);
@@ -243,7 +249,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
       this.add("test_field_no1");
     }}, new ArrayList<Object>() {{
       this.add(testObj);
-    }});
+    }}, new SamzaSqlRelMsgMetadata("", "", ""));
     Collection<SamzaSqlRelMessage> flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
     assertTrue(flattenedMsgs.size() == 1);
     assertTrue(flattenedMsgs.stream().anyMatch(s -> s.getSamzaSqlRelRecord().getFieldValues().get(0).equals(testObj)));
@@ -255,7 +261,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
       this.add("test_list_field1");
     }}, new ArrayList<Object>() {{
       this.add(testList);
-    }});
+    }}, new SamzaSqlRelMsgMetadata("", "", ""));
     flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
     assertTrue(flattenedMsgs.size() == 10);
     List<Integer> actualList = flattenedMsgs.stream()
@@ -269,7 +275,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
 
     // Verify that the describe() method will establish the context for the map function
     when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
-    when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(noOpMetricsRegistry);
+    when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(testMetricsRegistryImpl);
     Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
     mockContexts.put(1, mockTranslatorContext);
     when(mockContext.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
@@ -279,9 +285,23 @@ public class TestProjectTranslator extends TranslatorTestBase {
     assertEquals(mockTranslatorContext, Whitebox.getInternalState(mapFn, "translatorContext"));
     assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
     assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
+    // Verify TestMetricsRegistryImpl works with Project
+    assertEquals(1, testMetricsRegistryImpl.getGauges().size());
+    assertEquals(2, testMetricsRegistryImpl.getGauges().get(LOGICAL_OP_ID).size());
+    assertEquals(1, testMetricsRegistryImpl.getCounters().size());
+    assertEquals(2, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).size());
+    assertEquals(0, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+    assertEquals(0, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
+    // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
+    for (SamzaSqlRelMessage message : flattenedMsgs) {
+      mapFn.apply(message);
+    }
+    assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+    assertEquals(10, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
 
     // Calling mapFn.apply() to verify the filter function is correctly applied to the input message
-    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
+    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>(),
+        new SamzaSqlRelMsgMetadata("", "", ""));
     SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
     DataContext dataContext = mock(DataContext.class);
     when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);

http://git-wip-us.apache.org/repos/asf/samza/blob/bd9387b7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index d9039ec..c81a1fc 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -30,6 +30,8 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
@@ -38,22 +40,28 @@ import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.JsonUtil;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
+import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchQueryInfo;
-import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchSqlFromConfig;
-import static org.junit.Assert.assertTrue;
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 
 public class TestQueryTranslator {
 
   private final Map<String, String> configs = new HashMap<>();
+  private final Context mockContext = mock(Context.class);
+  private final ContainerContext mockContainerContext = mock(ContainerContext.class);
+  private TestMetricsRegistryImpl metricsRegistry = new TestMetricsRegistryImpl();
 
   @Before
   public void setUp() {
     configs.put("job.default.system", "kafka");
+    when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+    when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
   }
 
   @Test
@@ -72,7 +80,6 @@ public class TestQueryTranslator {
 
     StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
     QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
-
     translator.translate(queryInfo.get(0), appDesc, 0);
     OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
 
@@ -649,9 +656,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql0_join0", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql0_join0", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -661,9 +668,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql0_join0", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql0_join0", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", input4PhysicalName);
   }
 
   @Test
@@ -717,9 +724,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -729,9 +736,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", input4PhysicalName);
   }
 
   @Test
@@ -784,9 +791,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -796,9 +803,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PAGEVIEW", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", input4PhysicalName);
   }
 
   @Test