You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/20 22:05:52 UTC

samza git commit: SAMZA-1997: Samza-sql diagnostics - instrument project operator

Repository: samza
Updated Branches:
  refs/heads/master a6184f1e2 -> cce2b6f2e


SAMZA-1997: Samza-sql diagnostics - instrument project operator

When the user uses Samza-SQL, they use high level declarative language (SQL) for ease and speed of implementation of their Samza Job. Therefore, monitoring the job should provide metrics at this high/logical level. This is the goal of the Samza-SQL diagnostics project. In this first baby-step, we start with instrumenting the Project operator to provide run-time metrics.

Author: Shenoda Guirguis <sg...@sguirgui-ld2.linkedin.biz>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>, Aditya Toomula <at...@linkedin.com>

Closes #806 from shenodaguirguis/samza-sql-diagnostics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cce2b6f2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cce2b6f2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cce2b6f2

Branch: refs/heads/master
Commit: cce2b6f2ec080f6e79dc0d85d82eba9443be9710
Parents: a6184f1
Author: Shenoda Guirguis <sg...@sguirgui-ld2.linkedin.biz>
Authored: Tue Nov 20 14:05:47 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Nov 20 14:05:47 2018 -0800

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../apache/samza/metrics/SamzaHistogram.java    |  79 +++++++++++++
 .../consumer/EventHubSystemConsumer.java        |   2 +-
 .../system/eventhub/metrics/SamzaHistogram.java |  83 -------------
 .../eventhub/producer/AsyncSystemProducer.java  |   2 +-
 .../samza/sql/translator/JoinTranslator.java    |  10 +-
 .../translator/LogicalAggregateTranslator.java  |   8 +-
 .../samza/sql/translator/ProjectTranslator.java |  65 +++++++++--
 .../samza/sql/translator/QueryTranslator.java   |  13 ++-
 .../sql/testutil/TestMetricsRegistryImpl.java   | 117 +++++++++++++++++++
 .../sql/translator/TestJoinTranslator.java      |   2 +-
 .../sql/translator/TestProjectTranslator.java   |  92 +++++++++------
 .../sql/translator/TestQueryTranslator.java     |  24 ++--
 13 files changed, 339 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d0cd3ef..7011098 100644
--- a/build.gradle
+++ b/build.gradle
@@ -145,6 +145,7 @@ project(':samza-api') {
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "com.google.guava:guava:$guavaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "io.dropwizard.metrics:metrics-core:3.1.2"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..85835f9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
+ * Keeps a {@link Gauge} for each percentile
+ */
+public class SamzaHistogram {
+  private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+  private final Histogram histogram;
+  private final List<Double> percentiles;
+  private final Map<Double, Gauge<Double>> gauges;
+
+  public SamzaHistogram(MetricsRegistry registry, String group, String name) {
+    this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+  }
+
+  public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+    this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+    this.percentiles = percentiles;
+    this.gauges = this.percentiles.stream()
+        .filter(x -> x > 0 && x <= 100)
+        .collect(Collectors.toMap(Function.identity(),
+            x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
+  }
+
+  public void update(long value) {
+    histogram.update(value);
+  }
+
+  public void updateGaugeValues(double percentile) {
+    Snapshot values = histogram.getSnapshot();
+    gauges.get(percentile).set(values.getValue(percentile / 100));
+  }
+
+  /**
+   * Custom gauge whose value is set based on the underlying Histogram
+   */
+  private class HistogramGauge extends Gauge<Double> {
+    private final Double percentile;
+
+    public HistogramGauge(Double percentile, String name, double value) {
+      super(name, value);
+      this.percentile = percentile;
+    }
+
+    public void visit(MetricsVisitor visitor) {
+      updateGaugeValues(percentile);
+      visitor.gauge(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index a05b5e2..df98d5b 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -55,7 +55,7 @@ import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import org.apache.samza.system.eventhub.Interceptor;
 import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
-import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.apache.samza.util.BlockingEnvelopeMap;
 import org.apache.samza.util.Clock;

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
deleted file mode 100644
index 03fc114..0000000
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.eventhub.metrics;
-
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Snapshot;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsVisitor;
-
-
-/**
- * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
- * Keeps a {@link Gauge} for each percentile
- */
-public class SamzaHistogram {
-  private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
-  private final Histogram histogram;
-  private final List<Double> percentiles;
-  private final Map<Double, Gauge<Double>> gauges;
-
-  public SamzaHistogram(MetricsRegistry registry, String group, String name) {
-    this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
-  }
-
-  public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
-    this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
-    this.percentiles = percentiles;
-    this.gauges = this.percentiles.stream()
-        .filter(x -> x > 0 && x <= 100)
-        .collect(Collectors.toMap(Function.identity(),
-            x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
-  }
-
-  public void update(long value) {
-    histogram.update(value);
-  }
-
-  public void updateGaugeValues(double percentile) {
-    Snapshot values = histogram.getSnapshot();
-    gauges.get(percentile).set(values.getValue(percentile / 100));
-  }
-
-  /**
-   * Custom gauge whose value is set based on the underlying Histogram
-   */
-  private class HistogramGauge extends Gauge<Double> {
-    private final Double percentile;
-
-    public HistogramGauge(Double percentile, String name, double value) {
-      super(name, value);
-      this.percentile = percentile;
-    }
-
-    public void visit(MetricsVisitor visitor) {
-      updateGaugeValues(percentile);
-      visitor.gauge(this);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
index 26b58d6..83d51ed 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
@@ -40,7 +40,7 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.eventhub.EventHubConfig;
-import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.metrics.SamzaHistogram;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 440d598..7b8ff58 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -74,12 +74,12 @@ import static org.apache.samza.sql.translator.SamzaSqlTableJoinFunction.*;
 class JoinTranslator {
 
   private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
-  private int joinId;
+  private String logicalOpId;
   private final String intermediateStreamPrefix;
   private final int queryId;
 
-  JoinTranslator(int joinId, String intermediateStreamPrefix, int queryId) {
-    this.joinId = joinId;
+  JoinTranslator(String logicalOpId, String intermediateStreamPrefix, int queryId) {
+    this.logicalOpId = logicalOpId;
     this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
     this.queryId = queryId;
   }
@@ -156,7 +156,7 @@ class JoinTranslator {
         inputStream
             .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
             getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)), m -> m, KVSerde.of(keySerde, valueSerde),
-            intermediateStreamPrefix + "stream_" + joinId)
+            intermediateStreamPrefix + "stream_" + logicalOpId)
             .map(KV::getValue)
             .join(table, joinFn);
   }
@@ -363,7 +363,7 @@ class JoinTranslator {
 
     relOutputStream
         .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds), m -> m,
-            KVSerde.of(keySerde, valueSerde), intermediateStreamPrefix + "table_" + joinId)
+            KVSerde.of(keySerde, valueSerde), intermediateStreamPrefix + "table_" + logicalOpId)
         .sendTo(table);
 
     return table;

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
index 40a08ff..ac3d4e9 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -44,11 +44,11 @@ import org.slf4j.LoggerFactory;
 class LogicalAggregateTranslator {
 
   private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
-  private int windowId;
+  private String logicalOpId;
   private String changeLogStorePrefix;
 
-  LogicalAggregateTranslator(int windowId, String changeLogStorePrefix) {
-    this.windowId = windowId;
+  LogicalAggregateTranslator(String logicalOpId, String changeLogStorePrefix) {
+    this.logicalOpId = logicalOpId;
     this.changeLogStorePrefix = changeLogStorePrefix + (changeLogStorePrefix.isEmpty() ? "" : "_");
   }
 
@@ -72,7 +72,7 @@ class LogicalAggregateTranslator {
                 new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
                 new LongSerde())
                 .setAccumulationMode(
-                    AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + windowId)
+                    AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + logicalOpId)
             .map(windowPane -> {
                 List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
                 List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 29b1935..990d7db 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.sql.translator;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,7 +34,11 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.samza.SamzaException;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.sql.data.Expression;
@@ -53,39 +60,77 @@ class ProjectTranslator {
     this.queryId = queryId;
   }
 
-  private static class ProjectMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+  /**
+   * ProjectMapFunction implements MapFunction to map input SamzaSqlRelMessages, one at a time, to a new
+   * SamzaSqlRelMessage which consists of the projected fields
+   */
+  @VisibleForTesting
+  public static class ProjectMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
     private transient Project project;
     private transient Expression expr;
-    private transient TranslatorContext context;
+    private transient TranslatorContext translatorContext;
+    private transient MetricsRegistry metricsRegistry;
+    private transient SamzaHistogram processingTime; // milli-seconds
+    private transient Counter numEvents;
 
     private final int queryId;
     private final int projectId;
+    private final String logicalOpId;
+    private final String PROCESSING_TIME_NAME = "processingTime";
+    private final String NUM_EVENTS_NAME = "numEvents";
 
-    ProjectMapFunction(int projectId, int queryId) {
+    ProjectMapFunction(int projectId, int queryId, String logicalOpId) {
       this.projectId = projectId;
       this.queryId = queryId;
+      this.logicalOpId = logicalOpId;
     }
 
+    /**
+     * initializes the ProjectMapFunction before any message is processed
+     * @param context the {@link Context} for this task
+     */
     @Override
     public void init(Context context) {
-      this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
-      this.project = (Project) this.context.getRelNode(projectId);
-      this.expr = this.context.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
+      this.translatorContext = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+      this.project = (Project) this.translatorContext.getRelNode(projectId);
+      this.expr = this.translatorContext.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
+      ContainerContext containerContext = context.getContainerContext();
+      metricsRegistry = containerContext.getContainerMetricsRegistry();
+      processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, PROCESSING_TIME_NAME);
+      numEvents = metricsRegistry.newCounter(logicalOpId, NUM_EVENTS_NAME);
+      numEvents.clear();
     }
 
+    /**
+     * transforms the input message into the output message with projected fields
+     * @param message  the input message to be transformed
+     * @return the new SamzaSqlRelMessage message
+     */
     @Override
     public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+      Instant arrivalTime = Instant.now();
       RelDataType type = project.getRowType();
       Object[] output = new Object[type.getFieldCount()];
-      expr.execute(context.getExecutionContext(), context.getDataContext(),
+      expr.execute(translatorContext.getExecutionContext(), translatorContext.getDataContext(),
           message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
       List<String> names = new ArrayList<>();
       for (int index = 0; index < output.length; index++) {
         names.add(index, project.getNamedProjects().get(index).getValue());
       }
-
+      updateMetrics(arrivalTime, Instant.now());
       return new SamzaSqlRelMessage(names, Arrays.asList(output));
     }
+
+    /**
+     * Updates the Diagnostics Metrics (processing time and number of events)
+     * @param arrivalTime input message arrival time (= beging of processing in this operator)
+     * @param outputTime output message output time (=end of processing in this operator)
+     */
+    private void updateMetrics(Instant arrivalTime, Instant outputTime) {
+      numEvents.inc();
+      processingTime.update(Duration.between(arrivalTime, outputTime).toNanos() / 1000L);
+    }
+
   }
 
   private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
@@ -116,7 +161,7 @@ class ProjectTranslator {
     return ((RexInputRef) ((RexCall) rexNode).getOperands().get(0)).getIndex();
   }
 
-  void translate(final Project project, final TranslatorContext context) {
+  void translate(final Project project, final String logicalOpId, final TranslatorContext context) {
     MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId());
     List<Integer> flattenProjects =
         project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
@@ -132,7 +177,7 @@ class ProjectTranslator {
 
     final int projectId = project.getId();
 
-    MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new ProjectMapFunction(projectId, queryId));
+    MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new ProjectMapFunction(projectId, queryId, logicalOpId));
 
     context.registerMessageStream(project.getId(), outputStream);
     context.registerRelNode(project.getId(), project);

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 33781a6..bb34a41 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -45,7 +45,6 @@ import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
@@ -131,6 +130,7 @@ public class QueryTranslator {
     node.accept(new RelShuttleImpl() {
       int windowId = 0;
       int joinId = 0;
+      int opId = 0;
 
       @Override
       public RelNode visit(RelNode relNode) {
@@ -166,15 +166,16 @@ public class QueryTranslator {
       @Override
       public RelNode visit(LogicalProject project) {
         RelNode node = super.visit(project);
-        new ProjectTranslator(queryId).translate(project, translatorContext);
+        String logicalOpId = "sql" + Integer.toString(queryId) + "_project" + Integer.toString(opId++);
+        new ProjectTranslator(queryId).translate(project, logicalOpId, translatorContext);
         return node;
       }
 
       @Override
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
-        joinId++;
-        new JoinTranslator(joinId, sqlConfig.getMetadataTopicPrefix(), queryId)
+        String logicalOpId = "sql" + Integer.toString(queryId) + "_join" + Integer.toString(opId++);
+        new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(), queryId)
             .translate(join, translatorContext);
         return node;
       }
@@ -182,8 +183,8 @@ public class QueryTranslator {
       @Override
       public RelNode visit(LogicalAggregate aggregate) {
         RelNode node = super.visit(aggregate);
-        windowId++;
-        new LogicalAggregateTranslator(windowId, sqlConfig.getMetadataTopicPrefix())
+        String logicalOpId = "sql" + Integer.toString(queryId) + "_window" + Integer.toString(opId++);
+        new LogicalAggregateTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix())
             .translate(aggregate, translatorContext);
         return node;
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
new file mode 100644
index 0000000..618ca50
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.testutil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.metrics.Timer;
+
+
+/**
+ * TestMetricsRegistryImpl implements the MetricRegistry interface and adds get APIs
+ * for testing Translators.
+ */
+public class TestMetricsRegistryImpl implements org.apache.samza.metrics.MetricsRegistry {
+  private Map<String, List<Counter>> counters = new HashMap<>();
+  private Map<String, List<Timer>> timers = new HashMap<>();
+  private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
+  private Map<String, List<ListGauge>> listGauges = new HashMap<>();
+
+  @Override
+  public Counter newCounter(String group, String name) {
+    Counter counter = new Counter(name);
+    return newCounter(group, counter);
+  }
+
+  @Override
+  public Counter newCounter(String group, Counter counter) {
+    if (!counters.containsKey(group)) {
+      counters.put(group, new ArrayList<>());
+    }
+    counters.get(group).add(counter);
+    return counter;
+  }
+
+  /**
+   * retrieves the Map of Counters
+   * @return counters
+   */
+  public Map<String, List<Counter>> getCounters() {
+    return counters;
+  }
+
+  @Override
+  public Timer newTimer(String group, String name) {
+    Timer timer = new Timer(name);
+    return newTimer(group, timer);
+  }
+
+  @Override
+  public Timer newTimer(String group, Timer timer) {
+    if (!timers.containsKey(group)) {
+      timers.put(group, new ArrayList<>());
+    }
+    timers.get(group).add(timer);
+    return timer;
+  }
+
+  /**
+   * retrieves the Map of Timers
+   * @return timers
+   */
+  public Map<String, List<Timer>> getTimers() {
+    return timers;
+  }
+
+  @Override
+  public <T> Gauge<T> newGauge(String group, String name, T value) {
+    Gauge<T> gauge = new Gauge<>(name, value);
+    return newGauge(group, gauge);
+  }
+
+  @Override
+  public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
+    if (!gauges.containsKey(group)) {
+      gauges.put(group, new ArrayList<>());
+    }
+    gauges.get(group).add(gauge);
+    return gauge;
+  }
+
+  /**
+   * retrieves the Map of Gauges
+   * @return gauges
+   */
+  public Map<String, List<Gauge<?>>> getGauges() {
+    return gauges;
+  }
+
+  @Override
+  public ListGauge newListGauge(String group, ListGauge listGauge) {
+    listGauges.putIfAbsent(group, new ArrayList());
+    listGauges.get(group).add(listGauge);
+    return listGauge;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index d496982..85fe580 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -195,7 +195,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
     when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
 
     // Apply translate() method to verify that we are getting the correct map operator constructed
-    JoinTranslator joinTranslator = new JoinTranslator(3, "", 0);
+    JoinTranslator joinTranslator = new JoinTranslator("sql0_join3", "", 0);
     joinTranslator.translate(mockJoin, mockContext);
     // make sure that context has been registered with LogicFilter and output message streams
     verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 3771dbb..b6b2ddd 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.calcite.util.Pair;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
@@ -47,16 +48,15 @@ import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -72,11 +72,16 @@ import static org.mockito.Mockito.when;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(LogicalProject.class)
 public class TestProjectTranslator extends TranslatorTestBase {
+  final private String LOGICAL_OP_ID = "sql0_project_0";
   @Test
   public void testTranslate() throws IOException, ClassNotFoundException {
     // setup mock values to the constructor of FilterTranslator
     LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
-    TranslatorContext mockContext = mock(TranslatorContext.class);
+    Context mockContext = mock(Context.class);
+    ContainerContext mockContainerContext = mock(ContainerContext.class);
+    TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
+    TestMetricsRegistryImpl testMetricsRegistryImpl = new TestMetricsRegistryImpl();
+
     RelNode mockInput = mock(RelNode.class);
     List<RelNode> inputs = new ArrayList<>();
     inputs.add(mockInput);
@@ -94,43 +99,51 @@ public class TestProjectTranslator extends TranslatorTestBase {
     StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
     MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
-    when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
-    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+    when(mockTranslatorContext.getMessageStream(eq(1))).thenReturn(mockStream);
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(2), any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
-    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
     Expression mockExpr = mock(Expression.class);
     when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+    when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+    when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(testMetricsRegistryImpl);
 
     // Apply translate() method to verify that we are getting the correct map operator constructed
     ProjectTranslator projectTranslator = new ProjectTranslator(1);
-    projectTranslator.translate(mockProject, mockContext);
+    projectTranslator.translate(mockProject, LOGICAL_OP_ID, mockTranslatorContext);
     // make sure that context has been registered with LogicFilter and output message streams
-    verify(mockContext, times(1)).registerRelNode(2, mockProject);
-    verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
-    when(mockContext.getRelNode(2)).thenReturn(mockProject);
-    when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+    verify(mockTranslatorContext, times(1)).registerRelNode(2, mockProject);
+    verify(mockTranslatorContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+    when(mockTranslatorContext.getRelNode(2)).thenReturn(mockProject);
+    when(mockTranslatorContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
     StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
     assertNotNull(projectSpec);
     assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
 
     // Verify that the bootstrap() method will establish the context for the map function
-    Context context = mock(Context.class);
     Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
-    mockContexts.put(1, mockContext);
-    when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
-    projectSpec.getTransformFn().init(context);
+    mockContexts.put(1, mockTranslatorContext);
+    when(mockContext.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
+    projectSpec.getTransformFn().init(mockContext);
     MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
+
     assertNotNull(mapFn);
-    assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+    assertEquals(mockTranslatorContext, Whitebox.getInternalState(mapFn, "translatorContext"));
     assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
     assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
+    // Verify TestMetricsRegistryImpl works with Project
+    assertEquals(1, testMetricsRegistryImpl.getGauges().size());
+    assertEquals(2, testMetricsRegistryImpl.getGauges().get(LOGICAL_OP_ID).size());
+    assertEquals(1, testMetricsRegistryImpl.getCounters().size());
+    assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).size());
+    assertEquals(0, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
 
     // Calling mapFn.apply() to verify the filter function is correctly applied to the input message
     SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
     SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
     DataContext dataContext = mock(DataContext.class);
-    when(mockContext.getExecutionContext()).thenReturn(executionContext);
-    when(mockContext.getDataContext()).thenReturn(dataContext);
+    when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
+    when(mockTranslatorContext.getDataContext()).thenReturn(dataContext);
     Object[] result = new Object[1];
     final Object mockFieldObj = new Object();
 
@@ -149,13 +162,19 @@ public class TestProjectTranslator extends TranslatorTestBase {
           this.add(mockFieldObj);
         }});
 
+    // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
+    assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+
   }
 
   @Test
   public void testTranslateWithFlatten() throws IOException, ClassNotFoundException {
     // setup mock values to the constructor of ProjectTranslator
     LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
-    TranslatorContext mockContext = mock(TranslatorContext.class);
+    TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
+    Context mockContext = mock(Context.class);
+    ContainerContext mockContainerContext = mock(ContainerContext.class);
+    NoOpMetricsRegistry noOpMetricsRegistry = new NoOpMetricsRegistry();
     RelNode mockInput = mock(RelNode.class);
     List<RelNode> inputs = new ArrayList<>();
     inputs.add(mockInput);
@@ -198,21 +217,21 @@ public class TestProjectTranslator extends TranslatorTestBase {
     };
 
     MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
-    when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
-    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+    when(mockTranslatorContext.getMessageStream(eq(1))).thenReturn(mockStream);
+    doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(2), any(MessageStream.class));
     RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
-    when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+    when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
     Expression mockExpr = mock(Expression.class);
     when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
 
     // Apply translate() method to verify that we are getting the correct map operator constructed
     ProjectTranslator projectTranslator = new ProjectTranslator(1);
-    projectTranslator.translate(mockProject, mockContext);
+    projectTranslator.translate(mockProject, LOGICAL_OP_ID, mockTranslatorContext);
     // make sure that context has been registered with LogicFilter and output message streams
-    verify(mockContext, times(1)).registerRelNode(2, mockProject);
-    verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
-    when(mockContext.getRelNode(2)).thenReturn(mockProject);
-    when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+    verify(mockTranslatorContext, times(1)).registerRelNode(2, mockProject);
+    verify(mockTranslatorContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+    when(mockTranslatorContext.getRelNode(2)).thenReturn(mockProject);
+    when(mockTranslatorContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
 
 
     Collection<OperatorSpec>
@@ -249,14 +268,15 @@ public class TestProjectTranslator extends TranslatorTestBase {
     assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
 
     // Verify that the describe() method will establish the context for the map function
-    Context context = mock(Context.class);
+    when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+    when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(noOpMetricsRegistry);
     Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
-    mockContexts.put(1, mockContext);
-    when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
-    projectSpec.getTransformFn().init(context);
+    mockContexts.put(1, mockTranslatorContext);
+    when(mockContext.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
+    projectSpec.getTransformFn().init(mockContext);
     MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
     assertNotNull(mapFn);
-    assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+    assertEquals(mockTranslatorContext, Whitebox.getInternalState(mapFn, "translatorContext"));
     assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
     assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
 
@@ -264,8 +284,8 @@ public class TestProjectTranslator extends TranslatorTestBase {
     SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
     SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
     DataContext dataContext = mock(DataContext.class);
-    when(mockContext.getExecutionContext()).thenReturn(executionContext);
-    when(mockContext.getDataContext()).thenReturn(dataContext);
+    when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
+    when(mockTranslatorContext.getDataContext()).thenReturn(dataContext);
     Object[] result = new Object[1];
     final Object mockFieldObj = new Object();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 69f40d2..0c9091d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -508,9 +508,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql0_join0", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql0_join0", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -520,9 +520,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql0_join0", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql0_join0", input4PhysicalName);
   }
 
   @Test
@@ -576,9 +576,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -588,9 +588,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", input4PhysicalName);
   }
 
   @Test
@@ -643,9 +643,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -655,9 +655,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PAGEVIEW", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", input4PhysicalName);
   }
 
   @Test