You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2018/12/07 23:05:55 UTC

samza git commit: SAMZA-2028: Samza-SQL Diagnostics: add metrics to Join and Aggregate operators

Repository: samza
Updated Branches:
  refs/heads/master df56f2dc1 -> 970dfe57e


SAMZA-2028: Samza-SQL Diagnostics: add metrics to Join and Aggregate operators

by adding metrics to Join and Aggregate, it concludes the first phase (adding metrics) of Samza-SQL Diagnostics.

Author: Shenoda Guirguis <sg...@linkedin.com>

Reviewers: atoomula

Closes #848 from shenodaguirguis/joinmetrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/970dfe57
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/970dfe57
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/970dfe57

Branch: refs/heads/master
Commit: 970dfe57e5993c93d62cfb50b71167acc7596cad
Parents: df56f2d
Author: Shenoda Guirguis <sg...@linkedin.com>
Authored: Fri Dec 7 15:05:43 2018 -0800
Committer: Aditya Toomula <at...@linkedin.com>
Committed: Fri Dec 7 15:05:43 2018 -0800

----------------------------------------------------------------------
 .../samza/sql/data/SamzaSqlRelMsgMetadata.java  |  6 ++
 .../samza/sql/translator/JoinTranslator.java    | 21 +++++-
 .../translator/LogicalAggregateTranslator.java  |  2 +
 .../samza/sql/translator/QueryTranslator.java   |  1 -
 .../TranslatorInputMetricsMapFunction.java      | 67 ++++++++++++++++++
 .../TranslatorOutputMetricsMapFunction.java     | 71 ++++++++++++++++++++
 .../sql/translator/TestJoinTranslator.java      | 30 ++++++++-
 7 files changed, 192 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
index 1bdd5f6..713ecbe 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
@@ -36,6 +36,12 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
   public boolean isNewInputMessage = true;
 
   /**
+   *
+   */
+  public String operatorBeginProcessingInstant = null;
+
+
+  /**
    * The timestamp of when the events actually happened
    * set by and copied from the event source
    * TODO: copy eventTime through from source to RelMessage

http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 3a858f8..c99551e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -19,11 +19,11 @@
 
 package org.apache.samza.sql.translator;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
-import java.util.Objects;
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
@@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames;
 import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey;
-import static org.apache.samza.sql.translator.SamzaSqlTableJoinFunction.*;
 
 
 /**
@@ -77,11 +76,15 @@ class JoinTranslator {
   private String logicalOpId;
   private final String intermediateStreamPrefix;
   private final int queryId;
+  private final TranslatorInputMetricsMapFunction inputMetricsMF;
+  private final TranslatorOutputMetricsMapFunction outputMetricsMF;
 
   JoinTranslator(String logicalOpId, String intermediateStreamPrefix, int queryId) {
     this.logicalOpId = logicalOpId;
     this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
     this.queryId = queryId;
+    this.inputMetricsMF = new TranslatorInputMetricsMapFunction(logicalOpId);
+    this.outputMetricsMF = new TranslatorOutputMetricsMapFunction(logicalOpId);
   }
 
   void translate(final LogicalJoin join, final TranslatorContext translatorContext) {
@@ -116,6 +119,8 @@ class JoinTranslator {
         joinStreamWithTable(inputStream, table, streamNode, tableNode, join, translatorContext);
 
     translatorContext.registerMessageStream(join.getId(), outputStream);
+
+    outputStream.map(outputMetricsMF);
   }
 
   private MessageStream<SamzaSqlRelMessage> joinStreamWithTable(MessageStream<SamzaSqlRelMessage> inputStream,
@@ -137,7 +142,9 @@ class JoinTranslator {
       StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
           context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
 
-      return inputStream.join(table, joinFn);
+      return inputStream
+          .map(inputMetricsMF)
+          .join(table, joinFn);
     }
 
     // Join with the local table
@@ -154,6 +161,7 @@ class JoinTranslator {
     // the names from the stream as the lookup needs to be done based on what is stored in the local table.
     return
         inputStream
+            .map(inputMetricsMF)
             .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds,
             getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)), m -> m, KVSerde.of(keySerde, valueSerde),
             intermediateStreamPrefix + "stream_" + logicalOpId)
@@ -368,4 +376,11 @@ class JoinTranslator {
 
     return table;
   }
+
+  @VisibleForTesting
+  public TranslatorInputMetricsMapFunction getInputMetricsMF() { return this.inputMetricsMF; }
+
+  @VisibleForTesting
+  public TranslatorOutputMetricsMapFunction getOutputMetricsMF() { return this.outputMetricsMF; }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
index 357d7e6..aaa6a9e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -67,6 +67,7 @@ class LogicalAggregateTranslator {
 
     MessageStream<SamzaSqlRelMessage> outputStream =
         inputStream
+            .map(new TranslatorInputMetricsMapFunction(logicalOpId))
             .window(Windows.keyedTumblingWindow(m -> m,
                 Duration.ofMillis(context.getExecutionContext().getSamzaSqlApplicationConfig().getWindowDurationMs()),
                 initialValue,
@@ -83,6 +84,7 @@ class LogicalAggregateTranslator {
                 return new SamzaSqlRelMessage(fieldNames, fieldValues, new SamzaSqlRelMsgMetadata("", "", ""));
               });
     context.registerMessageStream(aggregate.getId(), outputStream);
+    outputStream.map(new TranslatorOutputMetricsMapFunction(logicalOpId));
   }
 
   private ArrayList<String> getAggFieldNames(LogicalAggregate aggregate) {

http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 13d0a25..c8d1edf 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -220,7 +220,6 @@ public class QueryTranslator {
 
     /* update input metrics */
     String queryLogicalId = String.format(TranslatorConstants.LOGSQLID_TEMPLATE, queryId);
-    //new InputMetricsMapFunction(queryLogicalId));
 
     opId = 0;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
new file mode 100644
index 0000000..bb3300a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Instant;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * TranslatorInputMetricsMapFunction is a dummy map function to maintain join metrics at Input
+ */
+class TranslatorInputMetricsMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+  private transient MetricsRegistry metricsRegistry;
+  private transient Counter inputEvents;
+
+  private final String logicalOpId;
+
+  TranslatorInputMetricsMapFunction(String logicalOpId) { this.logicalOpId = logicalOpId; }
+
+  /**
+   * initializes the TranslatorOutputMetricsMapFunction before any message is processed
+   * @param context the {@link Context} for this task
+   */
+  @Override
+  public void init(Context context) {
+    ContainerContext containerContext = context.getContainerContext();
+    metricsRegistry = containerContext.getContainerMetricsRegistry();
+    inputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.INPUT_EVENTS_NAME);
+    inputEvents.clear();
+  }
+
+  /**
+   * update metrics given a message
+   * @param message  the input message
+   * @return the same message
+   */
+  @Override
+  public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+    inputEvents.inc();
+    message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant = Instant.now().toString();
+    return message;
+  }
+
+} // TranslatorInputMetricsMapFunction

http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
new file mode 100644
index 0000000..f1757fb
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.time.Duration;
+import java.time.Instant;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SamzaHistogram;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * TranslatorOutputMetricsMapFunction is a dummy map function to maintain join metrics at output
+ */
+class TranslatorOutputMetricsMapFunction implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+  private transient MetricsRegistry metricsRegistry;
+  private transient SamzaHistogram processingTime; // milli-seconds
+  private transient Counter outputEvents;
+
+  private final String logicalOpId;
+
+  TranslatorOutputMetricsMapFunction(String logicalOpId) { this.logicalOpId = logicalOpId; }
+
+  /**
+   * initializes the TranslatorOutputMetricsMapFunction before any message is processed
+   * @param context the {@link Context} for this task
+   */
+  @Override
+  public void init(Context context) {
+    ContainerContext containerContext = context.getContainerContext();
+    metricsRegistry = containerContext.getContainerMetricsRegistry();
+    processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, TranslatorConstants.PROCESSING_TIME_NAME);
+    outputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.OUTPUT_EVENTS_NAME);
+    outputEvents.clear();
+  }
+
+  /**
+   * update metrics given a message
+   * @param message  the input message
+   * @return the same message
+   */
+  @Override
+  public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+    Instant endProcessing = Instant.now();
+    Instant beginProcessing = Instant.parse(message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant);
+    outputEvents.inc();
+    processingTime.update(Duration.between(beginProcessing, endProcessing).toMillis());
+    return message;
+  }
+} // TranslatorOutputMetricsMapFunction

http://git-wip-us.apache.org/repos/asf/samza/blob/970dfe57/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index 090fb90..233fca4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -36,10 +36,15 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
+import org.apache.samza.sql.testutil.TestMetricsRegistryImpl;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -91,6 +96,8 @@ public class TestJoinTranslator extends TranslatorTestBase {
 
   private void testTranslateStreamToTableJoin(boolean isRemoteTable) throws IOException, ClassNotFoundException {
     // setup mock values to the constructor of JoinTranslator
+    final String logicalOpId = "sql0_join3";
+    final int queryId = 0;
     LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
     TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
     RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
@@ -194,9 +201,27 @@ public class TestJoinTranslator extends TranslatorTestBase {
     when(ssConfigBySource.get(String.join(".", qualifiedTableName))).thenReturn(mockIOConfig);
     when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
 
+    JoinTranslator joinTranslator = new JoinTranslator(logicalOpId, "", queryId);
+
+    // Verify Metrics Works with Join
+    Context mockContext = mock(Context.class);
+    ContainerContext mockContainerContext = mock(ContainerContext.class);
+    TestMetricsRegistryImpl testMetricsRegistryImpl = new TestMetricsRegistryImpl();
+    when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
+    when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(testMetricsRegistryImpl);
+    TranslatorInputMetricsMapFunction inputMetricsMF = joinTranslator.getInputMetricsMF();
+    assertNotNull(inputMetricsMF);
+    inputMetricsMF.init(mockContext);
+    TranslatorOutputMetricsMapFunction outputMetricsMF = joinTranslator.getOutputMetricsMF();
+    assertNotNull(outputMetricsMF);
+    outputMetricsMF.init(mockContext);
+    assertEquals(1, testMetricsRegistryImpl.getCounters().size());
+    assertEquals(2, testMetricsRegistryImpl.getCounters().get(logicalOpId).size());
+    assertEquals(0, testMetricsRegistryImpl.getCounters().get(logicalOpId).get(0).getCount());
+    assertEquals(0, testMetricsRegistryImpl.getCounters().get(logicalOpId).get(1).getCount());
+    assertEquals(1, testMetricsRegistryImpl.getGauges().size());
+
     // Apply translate() method to verify that we are getting the correct map operator constructed
-    String logicalOpId = "sql0_join3";
-    JoinTranslator joinTranslator = new JoinTranslator(logicalOpId, "", 0);
     joinTranslator.translate(mockJoin, mockTranslatorContext);
     // make sure that context has been registered with LogicFilter and output message streams
     verify(mockTranslatorContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
@@ -210,6 +235,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
     // Verify joinSpec has the corresponding setup
     StreamTableJoinFunction joinFn = joinSpec.getJoinFn();
     assertNotNull(joinFn);
+
     if (isRemoteTable) {
       assertTrue(joinFn instanceof SamzaSqlRemoteTableJoinFunction);
     } else {