You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/20 22:05:52 UTC
samza git commit: SAMZA-1997: Samza-sql diagnostics - instrument
project operator
Repository: samza
Updated Branches:
refs/heads/master a6184f1e2 -> cce2b6f2e
SAMZA-1997: Samza-sql diagnostics - instrument project operator
When the user uses Samza-SQL, they use high level declarative language (SQL) for ease and speed of implementation of their Samza Job. Therefore, monitoring the job should provide metrics at this high/logical level. This is the goal of the Samza-SQL diagnostics project. In this first baby-step, we start with instrumenting the Project operator to provide run-time metrics.
Author: Shenoda Guirguis <sg...@sguirgui-ld2.linkedin.biz>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>, Aditya Toomula <at...@linkedin.com>
Closes #806 from shenodaguirguis/samza-sql-diagnostics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cce2b6f2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cce2b6f2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cce2b6f2
Branch: refs/heads/master
Commit: cce2b6f2ec080f6e79dc0d85d82eba9443be9710
Parents: a6184f1
Author: Shenoda Guirguis <sg...@sguirgui-ld2.linkedin.biz>
Authored: Tue Nov 20 14:05:47 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Nov 20 14:05:47 2018 -0800
----------------------------------------------------------------------
build.gradle | 1 +
.../apache/samza/metrics/SamzaHistogram.java | 79 +++++++++++++
.../consumer/EventHubSystemConsumer.java | 2 +-
.../system/eventhub/metrics/SamzaHistogram.java | 83 -------------
.../eventhub/producer/AsyncSystemProducer.java | 2 +-
.../samza/sql/translator/JoinTranslator.java | 10 +-
.../translator/LogicalAggregateTranslator.java | 8 +-
.../samza/sql/translator/ProjectTranslator.java | 65 +++++++++--
.../samza/sql/translator/QueryTranslator.java | 13 ++-
.../sql/testutil/TestMetricsRegistryImpl.java | 117 +++++++++++++++++++
.../sql/translator/TestJoinTranslator.java | 2 +-
.../sql/translator/TestProjectTranslator.java | 92 +++++++++------
.../sql/translator/TestQueryTranslator.java | 24 ++--
13 files changed, 339 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d0cd3ef..7011098 100644
--- a/build.gradle
+++ b/build.gradle
@@ -145,6 +145,7 @@ project(':samza-api') {
compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
compile "com.google.guava:guava:$guavaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
+ compile "io.dropwizard.metrics:metrics-core:3.1.2"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..85835f9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
+ * Keeps a {@link Gauge} for each percentile
+ */
+public class SamzaHistogram {
+ private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+ private final Histogram histogram;
+ private final List<Double> percentiles;
+ private final Map<Double, Gauge<Double>> gauges;
+
+ public SamzaHistogram(MetricsRegistry registry, String group, String name) {
+ this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+ }
+
+ public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+ this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+ this.percentiles = percentiles;
+ this.gauges = this.percentiles.stream()
+ .filter(x -> x > 0 && x <= 100)
+ .collect(Collectors.toMap(Function.identity(),
+ x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
+ }
+
+ public void update(long value) {
+ histogram.update(value);
+ }
+
+ public void updateGaugeValues(double percentile) {
+ Snapshot values = histogram.getSnapshot();
+ gauges.get(percentile).set(values.getValue(percentile / 100));
+ }
+
+ /**
+ * Custom gauge whose value is set based on the underlying Histogram
+ */
+ private class HistogramGauge extends Gauge<Double> {
+ private final Double percentile;
+
+ public HistogramGauge(Double percentile, String name, double value) {
+ super(name, value);
+ this.percentile = percentile;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ updateGaugeValues(percentile);
+ visitor.gauge(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index a05b5e2..df98d5b 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -55,7 +55,7 @@ import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.Interceptor;
import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
-import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
deleted file mode 100644
index 03fc114..0000000
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.eventhub.metrics;
-
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Snapshot;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsVisitor;
-
-
-/**
- * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir}
- * Keeps a {@link Gauge} for each percentile
- */
-public class SamzaHistogram {
- private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
- private final Histogram histogram;
- private final List<Double> percentiles;
- private final Map<Double, Gauge<Double>> gauges;
-
- public SamzaHistogram(MetricsRegistry registry, String group, String name) {
- this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
- }
-
- public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
- this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
- this.percentiles = percentiles;
- this.gauges = this.percentiles.stream()
- .filter(x -> x > 0 && x <= 100)
- .collect(Collectors.toMap(Function.identity(),
- x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
- }
-
- public void update(long value) {
- histogram.update(value);
- }
-
- public void updateGaugeValues(double percentile) {
- Snapshot values = histogram.getSnapshot();
- gauges.get(percentile).set(values.getValue(percentile / 100));
- }
-
- /**
- * Custom gauge whose value is set based on the underlying Histogram
- */
- private class HistogramGauge extends Gauge<Double> {
- private final Double percentile;
-
- public HistogramGauge(Double percentile, String name, double value) {
- super(name, value);
- this.percentile = percentile;
- }
-
- public void visit(MetricsVisitor visitor) {
- updateGaugeValues(percentile);
- visitor.gauge(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
index 26b58d6..83d51ed 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
@@ -40,7 +40,7 @@ import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.eventhub.EventHubConfig;
-import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
+import org.apache.samza.metrics.SamzaHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 440d598..7b8ff58 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -74,12 +74,12 @@ import static org.apache.samza.sql.translator.SamzaSqlTableJoinFunction.*;
class JoinTranslator {
private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
- private int joinId;
+ private String logicalOpId;
private final String intermediateStreamPrefix;
private final int queryId;
- JoinTranslator(int joinId, String intermediateStreamPrefix, int queryId) {
- this.joinId = joinId;
+ JoinTranslator(String logicalOpId, String intermediateStreamPrefix, int queryId) {
+ this.logicalOpId = logicalOpId;
this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
this.queryId = queryId;
}
@@ -156,7 +156,7 @@ class JoinTranslator {
inputStream
.partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)), m -> m, KVSerde.of(keySerde, valueSerde),
- intermediateStreamPrefix + "stream_" + joinId)
+ intermediateStreamPrefix + "stream_" + logicalOpId)
.map(KV::getValue)
.join(table, joinFn);
}
@@ -363,7 +363,7 @@ class JoinTranslator {
relOutputStream
.partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds), m -> m,
- KVSerde.of(keySerde, valueSerde), intermediateStreamPrefix + "table_" + joinId)
+ KVSerde.of(keySerde, valueSerde), intermediateStreamPrefix + "table_" + logicalOpId)
.sendTo(table);
return table;
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
index 40a08ff..ac3d4e9 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -44,11 +44,11 @@ import org.slf4j.LoggerFactory;
class LogicalAggregateTranslator {
private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
- private int windowId;
+ private String logicalOpId;
private String changeLogStorePrefix;
- LogicalAggregateTranslator(int windowId, String changeLogStorePrefix) {
- this.windowId = windowId;
+ LogicalAggregateTranslator(String logicalOpId, String changeLogStorePrefix) {
+ this.logicalOpId = logicalOpId;
this.changeLogStorePrefix = changeLogStorePrefix + (changeLogStorePrefix.isEmpty() ? "" : "_");
}
@@ -72,7 +72,7 @@ class LogicalAggregateTranslator {
new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
new LongSerde())
.setAccumulationMode(
- AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + windowId)
+ AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + logicalOpId)
.map(windowPane -> {
List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 29b1935..990d7db 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -19,6 +19,9 @@
package org.apache.samza.sql.translator;
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -31,7 +34,11 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.samza.SamzaException;
+import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.sql.data.Expression;
@@ -53,39 +60,77 @@ class ProjectTranslator {
this.queryId = queryId;
}
- private static class ProjectMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+ /**
+ * ProjectMapFunction implements MapFunction to map input SamzaSqlRelMessages, one at a time, to a new
+ * SamzaSqlRelMessage which consists of the projected fields
+ */
+ @VisibleForTesting
+ public static class ProjectMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
private transient Project project;
private transient Expression expr;
- private transient TranslatorContext context;
+ private transient TranslatorContext translatorContext;
+ private transient MetricsRegistry metricsRegistry;
+ private transient SamzaHistogram processingTime; // milli-seconds
+ private transient Counter numEvents;
private final int queryId;
private final int projectId;
+ private final String logicalOpId;
+ private final String PROCESSING_TIME_NAME = "processingTime";
+ private final String NUM_EVENTS_NAME = "numEvents";
- ProjectMapFunction(int projectId, int queryId) {
+ ProjectMapFunction(int projectId, int queryId, String logicalOpId) {
this.projectId = projectId;
this.queryId = queryId;
+ this.logicalOpId = logicalOpId;
}
+ /**
+ * initializes the ProjectMapFunction before any message is processed
+ * @param context the {@link Context} for this task
+ */
@Override
public void init(Context context) {
- this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
- this.project = (Project) this.context.getRelNode(projectId);
- this.expr = this.context.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
+ this.translatorContext = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+ this.project = (Project) this.translatorContext.getRelNode(projectId);
+ this.expr = this.translatorContext.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
+ ContainerContext containerContext = context.getContainerContext();
+ metricsRegistry = containerContext.getContainerMetricsRegistry();
+ processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, PROCESSING_TIME_NAME);
+ numEvents = metricsRegistry.newCounter(logicalOpId, NUM_EVENTS_NAME);
+ numEvents.clear();
}
+ /**
+ * transforms the input message into the output message with projected fields
+ * @param message the input message to be transformed
+ * @return the new SamzaSqlRelMessage message
+ */
@Override
public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+ Instant arrivalTime = Instant.now();
RelDataType type = project.getRowType();
Object[] output = new Object[type.getFieldCount()];
- expr.execute(context.getExecutionContext(), context.getDataContext(),
+ expr.execute(translatorContext.getExecutionContext(), translatorContext.getDataContext(),
message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
List<String> names = new ArrayList<>();
for (int index = 0; index < output.length; index++) {
names.add(index, project.getNamedProjects().get(index).getValue());
}
-
+ updateMetrics(arrivalTime, Instant.now());
return new SamzaSqlRelMessage(names, Arrays.asList(output));
}
+
+ /**
+ * Updates the Diagnostics Metrics (processing time and number of events)
+ * @param arrivalTime input message arrival time (= beging of processing in this operator)
+ * @param outputTime output message output time (=end of processing in this operator)
+ */
+ private void updateMetrics(Instant arrivalTime, Instant outputTime) {
+ numEvents.inc();
+ processingTime.update(Duration.between(arrivalTime, outputTime).toNanos() / 1000L);
+ }
+
}
private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
@@ -116,7 +161,7 @@ class ProjectTranslator {
return ((RexInputRef) ((RexCall) rexNode).getOperands().get(0)).getIndex();
}
- void translate(final Project project, final TranslatorContext context) {
+ void translate(final Project project, final String logicalOpId, final TranslatorContext context) {
MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId());
List<Integer> flattenProjects =
project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
@@ -132,7 +177,7 @@ class ProjectTranslator {
final int projectId = project.getId();
- MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new ProjectMapFunction(projectId, queryId));
+ MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new ProjectMapFunction(projectId, queryId, logicalOpId));
context.registerMessageStream(project.getId(), outputStream);
context.registerRelNode(project.getId(), project);
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 33781a6..bb34a41 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -45,7 +45,6 @@ import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
@@ -131,6 +130,7 @@ public class QueryTranslator {
node.accept(new RelShuttleImpl() {
int windowId = 0;
int joinId = 0;
+ int opId = 0;
@Override
public RelNode visit(RelNode relNode) {
@@ -166,15 +166,16 @@ public class QueryTranslator {
@Override
public RelNode visit(LogicalProject project) {
RelNode node = super.visit(project);
- new ProjectTranslator(queryId).translate(project, translatorContext);
+ String logicalOpId = "sql" + Integer.toString(queryId) + "_project" + Integer.toString(opId++);
+ new ProjectTranslator(queryId).translate(project, logicalOpId, translatorContext);
return node;
}
@Override
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
- joinId++;
- new JoinTranslator(joinId, sqlConfig.getMetadataTopicPrefix(), queryId)
+ String logicalOpId = "sql" + Integer.toString(queryId) + "_join" + Integer.toString(opId++);
+ new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(), queryId)
.translate(join, translatorContext);
return node;
}
@@ -182,8 +183,8 @@ public class QueryTranslator {
@Override
public RelNode visit(LogicalAggregate aggregate) {
RelNode node = super.visit(aggregate);
- windowId++;
- new LogicalAggregateTranslator(windowId, sqlConfig.getMetadataTopicPrefix())
+ String logicalOpId = "sql" + Integer.toString(queryId) + "_window" + Integer.toString(opId++);
+ new LogicalAggregateTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix())
.translate(aggregate, translatorContext);
return node;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
new file mode 100644
index 0000000..618ca50
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestMetricsRegistryImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.testutil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.metrics.Timer;
+
+
+/**
+ * TestMetricsRegistryImpl implements the MetricRegistry interface and adds get APIs
+ * for testing Translators.
+ */
+public class TestMetricsRegistryImpl implements org.apache.samza.metrics.MetricsRegistry {
+ private Map<String, List<Counter>> counters = new HashMap<>();
+ private Map<String, List<Timer>> timers = new HashMap<>();
+ private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
+ private Map<String, List<ListGauge>> listGauges = new HashMap<>();
+
+ @Override
+ public Counter newCounter(String group, String name) {
+ Counter counter = new Counter(name);
+ return newCounter(group, counter);
+ }
+
+ @Override
+ public Counter newCounter(String group, Counter counter) {
+ if (!counters.containsKey(group)) {
+ counters.put(group, new ArrayList<>());
+ }
+ counters.get(group).add(counter);
+ return counter;
+ }
+
+ /**
+ * retrieves the Map of Counters
+ * @return counters
+ */
+ public Map<String, List<Counter>> getCounters() {
+ return counters;
+ }
+
+ @Override
+ public Timer newTimer(String group, String name) {
+ Timer timer = new Timer(name);
+ return newTimer(group, timer);
+ }
+
+ @Override
+ public Timer newTimer(String group, Timer timer) {
+ if (!timers.containsKey(group)) {
+ timers.put(group, new ArrayList<>());
+ }
+ timers.get(group).add(timer);
+ return timer;
+ }
+
+ /**
+ * retrieves the Map of Timers
+ * @return timers
+ */
+ public Map<String, List<Timer>> getTimers() {
+ return timers;
+ }
+
+ @Override
+ public <T> Gauge<T> newGauge(String group, String name, T value) {
+ Gauge<T> gauge = new Gauge<>(name, value);
+ return newGauge(group, gauge);
+ }
+
+ @Override
+ public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
+ if (!gauges.containsKey(group)) {
+ gauges.put(group, new ArrayList<>());
+ }
+ gauges.get(group).add(gauge);
+ return gauge;
+ }
+
+ /**
+ * retrieves the Map of Gauges
+ * @return gauges
+ */
+ public Map<String, List<Gauge<?>>> getGauges() {
+ return gauges;
+ }
+
+ @Override
+ public ListGauge newListGauge(String group, ListGauge listGauge) {
+ listGauges.putIfAbsent(group, new ArrayList());
+ listGauges.get(group).add(listGauge);
+ return listGauge;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index d496982..85fe580 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -195,7 +195,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
// Apply translate() method to verify that we are getting the correct map operator constructed
- JoinTranslator joinTranslator = new JoinTranslator(3, "", 0);
+ JoinTranslator joinTranslator = new JoinTranslator("sql0_join3", "", 0);
joinTranslator.translate(mockJoin, mockContext);
// make sure that context has been registered with LogicFilter and output message streams
verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 3771dbb..b6b2ddd 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.calcite.util.Pair;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
@@ -47,16 +48,15 @@ import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
+import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@@ -72,11 +72,16 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(LogicalProject.class)
public class TestProjectTranslator extends TranslatorTestBase {
+ final private String LOGICAL_OP_ID = "sql0_project_0";
@Test
public void testTranslate() throws IOException, ClassNotFoundException {
// setup mock values to the constructor of FilterTranslator
LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
- TranslatorContext mockContext = mock(TranslatorContext.class);
+ Context mockContext = mock(Context.class);
+ ContainerContext mockContainerContext = mock(ContainerContext.class);
+ TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
+ TestMetricsRegistryImpl testMetricsRegistryImpl = new TestMetricsRegistryImpl();
+
RelNode mockInput = mock(RelNode.class);
List<RelNode> inputs = new ArrayList<>();
inputs.add(mockInput);
@@ -94,43 +99,51 @@ public class TestProjectTranslator extends TranslatorTestBase {
StreamApplicationDescriptorImpl mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = mock(OperatorSpec.class);
MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
- when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
- doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+ when(mockTranslatorContext.getMessageStream(eq(1))).thenReturn(mockStream);
+ doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(2), any(MessageStream.class));
RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
- when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+ when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
Expression mockExpr = mock(Expression.class);
when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
+ when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+ when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(testMetricsRegistryImpl);
// Apply translate() method to verify that we are getting the correct map operator constructed
ProjectTranslator projectTranslator = new ProjectTranslator(1);
- projectTranslator.translate(mockProject, mockContext);
+ projectTranslator.translate(mockProject, LOGICAL_OP_ID, mockTranslatorContext);
// make sure that context has been registered with LogicFilter and output message streams
- verify(mockContext, times(1)).registerRelNode(2, mockProject);
- verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
- when(mockContext.getRelNode(2)).thenReturn(mockProject);
- when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+ verify(mockTranslatorContext, times(1)).registerRelNode(2, mockProject);
+ verify(mockTranslatorContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+ when(mockTranslatorContext.getRelNode(2)).thenReturn(mockProject);
+ when(mockTranslatorContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
StreamOperatorSpec projectSpec = (StreamOperatorSpec) Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
assertNotNull(projectSpec);
assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
// Verify that the bootstrap() method will establish the context for the map function
- Context context = mock(Context.class);
Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
- mockContexts.put(1, mockContext);
- when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
- projectSpec.getTransformFn().init(context);
+ mockContexts.put(1, mockTranslatorContext);
+ when(mockContext.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
+ projectSpec.getTransformFn().init(mockContext);
MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
+
assertNotNull(mapFn);
- assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+ assertEquals(mockTranslatorContext, Whitebox.getInternalState(mapFn, "translatorContext"));
assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
+ // Verify TestMetricsRegistryImpl works with Project
+ assertEquals(1, testMetricsRegistryImpl.getGauges().size());
+ assertEquals(2, testMetricsRegistryImpl.getGauges().get(LOGICAL_OP_ID).size());
+ assertEquals(1, testMetricsRegistryImpl.getCounters().size());
+ assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).size());
+ assertEquals(0, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
// Calling mapFn.apply() to verify the filter function is correctly applied to the input message
SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
DataContext dataContext = mock(DataContext.class);
- when(mockContext.getExecutionContext()).thenReturn(executionContext);
- when(mockContext.getDataContext()).thenReturn(dataContext);
+ when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
+ when(mockTranslatorContext.getDataContext()).thenReturn(dataContext);
Object[] result = new Object[1];
final Object mockFieldObj = new Object();
@@ -149,13 +162,19 @@ public class TestProjectTranslator extends TranslatorTestBase {
this.add(mockFieldObj);
}});
+ // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
+ assertEquals(1, testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
+
}
@Test
public void testTranslateWithFlatten() throws IOException, ClassNotFoundException {
// setup mock values to the constructor of ProjectTranslator
LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
- TranslatorContext mockContext = mock(TranslatorContext.class);
+ TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
+ Context mockContext = mock(Context.class);
+ ContainerContext mockContainerContext = mock(ContainerContext.class);
+ NoOpMetricsRegistry noOpMetricsRegistry = new NoOpMetricsRegistry();
RelNode mockInput = mock(RelNode.class);
List<RelNode> inputs = new ArrayList<>();
inputs.add(mockInput);
@@ -198,21 +217,21 @@ public class TestProjectTranslator extends TranslatorTestBase {
};
MessageStream<SamzaSqlRelMessage> mockStream = new MessageStreamImpl<>(mockAppDesc, mockInputOp);
- when(mockContext.getMessageStream(eq(1))).thenReturn(mockStream);
- doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(2), any(MessageStream.class));
+ when(mockTranslatorContext.getMessageStream(eq(1))).thenReturn(mockStream);
+ doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(2), any(MessageStream.class));
RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
- when(mockContext.getExpressionCompiler()).thenReturn(mockCompiler);
+ when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
Expression mockExpr = mock(Expression.class);
when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
// Apply translate() method to verify that we are getting the correct map operator constructed
ProjectTranslator projectTranslator = new ProjectTranslator(1);
- projectTranslator.translate(mockProject, mockContext);
+ projectTranslator.translate(mockProject, LOGICAL_OP_ID, mockTranslatorContext);
// make sure that context has been registered with LogicFilter and output message streams
- verify(mockContext, times(1)).registerRelNode(2, mockProject);
- verify(mockContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
- when(mockContext.getRelNode(2)).thenReturn(mockProject);
- when(mockContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
+ verify(mockTranslatorContext, times(1)).registerRelNode(2, mockProject);
+ verify(mockTranslatorContext, times(1)).registerMessageStream(2, this.getRegisteredMessageStream(2));
+ when(mockTranslatorContext.getRelNode(2)).thenReturn(mockProject);
+ when(mockTranslatorContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
Collection<OperatorSpec>
@@ -249,14 +268,15 @@ public class TestProjectTranslator extends TranslatorTestBase {
assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
// Verify that the describe() method will establish the context for the map function
- Context context = mock(Context.class);
+ when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+ when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(noOpMetricsRegistry);
Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
- mockContexts.put(1, mockContext);
- when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
- projectSpec.getTransformFn().init(context);
+ mockContexts.put(1, mockTranslatorContext);
+ when(mockContext.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContexts));
+ projectSpec.getTransformFn().init(mockContext);
MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
assertNotNull(mapFn);
- assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
+ assertEquals(mockTranslatorContext, Whitebox.getInternalState(mapFn, "translatorContext"));
assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
@@ -264,8 +284,8 @@ public class TestProjectTranslator extends TranslatorTestBase {
SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new ArrayList<>(), new ArrayList<>());
SamzaSqlExecutionContext executionContext = mock(SamzaSqlExecutionContext.class);
DataContext dataContext = mock(DataContext.class);
- when(mockContext.getExecutionContext()).thenReturn(executionContext);
- when(mockContext.getDataContext()).thenReturn(dataContext);
+ when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
+ when(mockTranslatorContext.getDataContext()).thenReturn(dataContext);
Object[] result = new Object[1];
final Object mockFieldObj = new Object();
http://git-wip-us.apache.org/repos/asf/samza/blob/cce2b6f2/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 69f40d2..0c9091d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -508,9 +508,9 @@ public class TestQueryTranslator {
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql0_join0", output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql0_join0", output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -520,9 +520,9 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql0_join0", input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql0_join0", input4PhysicalName);
}
@Test
@@ -576,9 +576,9 @@ public class TestQueryTranslator {
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -588,9 +588,9 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", input4PhysicalName);
}
@Test
@@ -643,9 +643,9 @@ public class TestQueryTranslator {
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -655,9 +655,9 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PAGEVIEW", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql0_join0", input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql0_join0", input4PhysicalName);
}
@Test