You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2018/12/07 23:05:55 UTC
samza git commit: SAMZA-2028: Samza-SQL Diagnostics: add metrics to
Join and Aggregate operators
Repository: samza
Updated Branches:
refs/heads/master df56f2dc1 -> 970dfe57e
SAMZA-2028: Samza-SQL Diagnostics: add metrics to Join and Aggregate operators
by adding metrics to Join and Aggregate, it concludes the first phase (adding metrics) of Samza-SQL Diagnostics.
Author: Shenoda Guirguis <sg...@linkedin.com>
Reviewers: atoomula
Closes #848 from shenodaguirguis/joinmetrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/970dfe57
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/970dfe57
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/970dfe57
Branch: refs/heads/master
Commit: 970dfe57e5993c93d62cfb50b71167acc7596cad
Parents: df56f2d
Author: Shenoda Guirguis <sg...@linkedin.com>
Authored: Fri Dec 7 15:05:43 2018 -0800
Committer: Aditya Toomula <at...@linkedin.com>
Committed: Fri Dec 7 15:05:43 2018 -0800
----------------------------------------------------------------------
.../samza/sql/data/SamzaSqlRelMsgMetadata.java | 6 ++
.../samza/sql/translator/JoinTranslator.java | 21 +++++-
.../translator/LogicalAggregateTranslator.java | 2 +
.../samza/sql/translator/QueryTranslator.java | 1 -
.../TranslatorInputMetricsMapFunction.java | 67 ++++++++++++++++++
.../TranslatorOutputMetricsMapFunction.java | 71 ++++++++++++++++++++
.../sql/translator/TestJoinTranslator.java | 30 ++++++++-
7 files changed, 192 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
index 1bdd5f6..713ecbe 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
@@ -36,6 +36,12 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
public boolean isNewInputMessage = true;
/**
+ *
+ */
+ public String operatorBeginProcessingInstant = null;
+
+
+ /**
* The timestamp of when the events actually happened
* set by and copied from the event source
* TODO: copy eventTime through from source to RelMessage
http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 3a858f8..c99551e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -19,11 +19,11 @@
package org.apache.samza.sql.translator;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
@@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory;
import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
-import static org.apache.samza.sql.translator.SamzaSqlTableJoinFunction.*;
/**
@@ -77,11 +76,15 @@ class JoinTranslator {
private String logicalOpId;
private final String intermediateStreamPrefix;
private final int queryId;
+ private final TranslatorInputMetricsMapFunction inputMetricsMF;
+ private final TranslatorOutputMetricsMapFunction outputMetricsMF;
JoinTranslator(String logicalOpId, String intermediateStreamPrefix, int queryId) {
this.logicalOpId = logicalOpId;
this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
this.queryId = queryId;
+ this.inputMetricsMF = new TranslatorInputMetricsMapFunction(logicalOpId);
+ this.outputMetricsMF = new TranslatorOutputMetricsMapFunction(logicalOpId);
}
void translate(final LogicalJoin join, final TranslatorContext translatorContext) {
@@ -116,6 +119,8 @@ class JoinTranslator {
joinStreamWithTable(inputStream, table, streamNode, tableNode, join, translatorContext);
translatorContext.registerMessageStream(join.getId(), outputStream);
+
+ outputStream.map(outputMetricsMF);
}
private MessageStream<SamzaSqlRelMessage> joinStreamWithTable(MessageStream<SamzaSqlRelMessage> inputStream,
@@ -137,7 +142,9 @@ class JoinTranslator {
StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
- return inputStream.join(table, joinFn);
+ return inputStream
+ .map(inputMetricsMF)
+ .join(table, joinFn);
}
// Join with the local table
@@ -154,6 +161,7 @@ class JoinTranslator {
// the names from the stream as the lookup needs to be done based on what is stored in the local table.
return
inputStream
+ .map(inputMetricsMF)
.partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)), m -> m, KVSerde.of(keySerde, valueSerde),
intermediateStreamPrefix + "stream_" + logicalOpId)
@@ -368,4 +376,11 @@ class JoinTranslator {
return table;
}
+
+ @VisibleForTesting
+ public TranslatorInputMetricsMapFunction getInputMetricsMF() { return this.inputMetricsMF; }
+
+ @VisibleForTesting
+ public TranslatorOutputMetricsMapFunction getOutputMetricsMF() { return this.outputMetricsMF; }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
index 357d7e6..aaa6a9e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -67,6 +67,7 @@ class LogicalAggregateTranslator {
MessageStream<SamzaSqlRelMessage> outputStream =
inputStream
+ .map(new TranslatorInputMetricsMapFunction(logicalOpId))
.window(Windows.keyedTumblingWindow(m -> m,
Duration.ofMillis(context.getExecutionContext().getSamzaSqlApplicationConfig().getWindowDurationMs()),
initialValue,
@@ -83,6 +84,7 @@ class LogicalAggregateTranslator {
return new SamzaSqlRelMessage(fieldNames, fieldValues, new SamzaSqlRelMsgMetadata("", "", ""));
});
context.registerMessageStream(aggregate.getId(), outputStream);
+ outputStream.map(new TranslatorOutputMetricsMapFunction(logicalOpId));
}
private ArrayList<String> getAggFieldNames(LogicalAggregate aggregate) {
http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 13d0a25..c8d1edf 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -220,7 +220,6 @@ public class QueryTranslator {
/* update input metrics */
String queryLogicalId = String.format(TranslatorConstants.LOGSQLID_TEMPLATE, queryId);
- //new InputMetricsMapFunction(queryLogicalId));
opId = 0;
http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
new file mode 100644
index 0000000..bb3300a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Instant;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * TranslatorInputMetricsMapFunction is a dummy map function to maintain join metrics at Input
+ */
+class TranslatorInputMetricsMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+ private transient MetricsRegistry metricsRegistry;
+ private transient Counter inputEvents;
+
+ private final String logicalOpId;
+
+ TranslatorInputMetricsMapFunction(String logicalOpId) { this.logicalOpId = logicalOpId; }
+
+ /**
+ * initializes the TranslatorOutputMetricsMapFunction before any message is processed
+ * @param context the {@link Context} for this task
+ */
+ @Override
+ public void init(Context context) {
+ ContainerContext containerContext = context.getContainerContext();
+ metricsRegistry = containerContext.getContainerMetricsRegistry();
+ inputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.INPUT_EVENTS_NAME);
+ inputEvents.clear();
+ }
+
+ /**
+ * update metrics given a message
+ * @param message the input message
+ * @return the same message
+ */
+ @Override
+ public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+ inputEvents.inc();
+ message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant = Instant.now().toString();
+ return message;
+ }
+
+} // TranslatorInputMetricsMapFunction
http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
new file mode 100644
index 0000000..f1757fb
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.time.Duration;
+import java.time.Instant;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SamzaHistogram;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * TranslatorOutputMetricsMapFunction is a dummy map function to maintain join metrics at output
+ */
+class TranslatorOutputMetricsMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+ private transient MetricsRegistry metricsRegistry;
+ private transient SamzaHistogram processingTime; // milli-seconds
+ private transient Counter outputEvents;
+
+ private final String logicalOpId;
+
+ TranslatorOutputMetricsMapFunction(String logicalOpId) { this.logicalOpId = logicalOpId; }
+
+ /**
+ * initializes the TranslatorOutputMetricsMapFunction before any message is processed
+ * @param context the {@link Context} for this task
+ */
+ @Override
+ public void init(Context context) {
+ ContainerContext containerContext = context.getContainerContext();
+ metricsRegistry = containerContext.getContainerMetricsRegistry();
+ processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, TranslatorConstants.PROCESSING_TIME_NAME);
+ outputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.OUTPUT_EVENTS_NAME);
+ outputEvents.clear();
+ }
+
+ /**
+ * update metrics given a message
+ * @param message the input message
+ * @return the same message
+ */
+ @Override
+ public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+ Instant endProcessing = Instant.now();
+ Instant beginProcessing = Instant.parse(message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant);
+ outputEvents.inc();
+ processingTime.update(Duration.between(beginProcessing, endProcessing).toMillis());
+ return message;
+ }
+} // TranslatorOutputMetricsMapFunction
http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 090fb90..233fca4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -36,10 +36,15 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -91,6 +96,8 @@ public class TestJoinTranslator extends TranslatorTestBase {
private void testTranslateStreamToTableJoin(boolean isRemoteTable) throws IOException, ClassNotFoundException {
// setup mock values to the constructor of JoinTranslator
+ final String logicalOpId = "sql0_join3";
+ final int queryId = 0;
LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
@@ -194,9 +201,27 @@ public class TestJoinTranslator extends TranslatorTestBase {
when(ssConfigBySource.get(String.join(".", qualifiedTableName))).thenReturn(mockIOConfig);
when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
+ JoinTranslator joinTranslator = new JoinTranslator(logicalOpId, "", queryId);
+
+ // Verify Metrics Works with Join
+ Context mockContext = mock(Context.class);
+ ContainerContext mockContainerContext = mock(ContainerContext.class);
+ TestMetricsRegistryImpl testMetricsRegistryImpl = new TestMetricsRegistryImpl();
+ when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+ when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(testMetricsRegistryImpl);
+ TranslatorInputMetricsMapFunction inputMetricsMF = joinTranslator.getInputMetricsMF();
+ assertNotNull(inputMetricsMF);
+ inputMetricsMF.init(mockContext);
+ TranslatorOutputMetricsMapFunction outputMetricsMF = joinTranslator.getOutputMetricsMF();
+ assertNotNull(outputMetricsMF);
+ outputMetricsMF.init(mockContext);
+ assertEquals(1, testMetricsRegistryImpl.getCounters().size());
+ assertEquals(2, testMetricsRegistryImpl.getCounters().get(logicalOpId).size());
+ assertEquals(0, testMetricsRegistryImpl.getCounters().get(logicalOpId).get(0).getCount());
+ assertEquals(0, testMetricsRegistryImpl.getCounters().get(logicalOpId).get(1).getCount());
+ assertEquals(1, testMetricsRegistryImpl.getGauges().size());
+
// Apply translate() method to verify that we are getting the correct map operator constructed
- String logicalOpId = "sql0_join3";
- JoinTranslator joinTranslator = new JoinTranslator(logicalOpId, "", 0);
joinTranslator.translate(mockJoin, mockTranslatorContext);
// make sure that context has been registered with LogicFilter and output message streams
verify(mockTranslatorContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
@@ -210,6 +235,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
// Verify joinSpec has the corresponding setup
StreamTableJoinFunction joinFn = joinSpec.getJoinFn();
assertNotNull(joinFn);
+
if (isRemoteTable) {
assertTrue(joinFn instanceof SamzaSqlRemoteTableJoinFunction);
} else {