You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2019/03/05 00:21:55 UTC

[samza] branch master updated: Propogating the system messages to the stream. (#937)

This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b8dc03  Propogating the system messages to the stream. (#937)
3b8dc03 is described below

commit 3b8dc03b22bf37aeee192190daf26653de5d61ea
Author: Srinivasulu Punuru <sr...@users.noreply.github.com>
AuthorDate: Mon Mar 4 16:21:51 2019 -0800

    Propogating the system messages to the stream. (#937)
    
    * Adding system messages to the stream
    
    * checkstyle fixes
---
 .../samza/sql/data/SamzaSqlRelMsgMetadata.java     | 24 +++++-
 .../sql/runner/SamzaSqlApplicationConfig.java      |  8 ++
 .../samza/sql/translator/ProjectTranslator.java    |  1 -
 .../samza/sql/translator/QueryTranslator.java      | 12 +++
 .../samza/sql/translator/ScanTranslator.java       | 98 +++++++++++++++-------
 .../TranslatorInputMetricsMapFunction.java         |  3 +-
 .../TranslatorOutputMetricsMapFunction.java        |  2 +-
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 25 +++++-
 8 files changed, 133 insertions(+), 40 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
index 713ecbe..14f2892 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
@@ -20,6 +20,8 @@
 package org.apache.samza.sql.data;
 
 import java.io.Serializable;
+import java.time.Instant;
+import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 
@@ -36,9 +38,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
   public boolean isNewInputMessage = true;
 
   /**
-   *
+   * Indicates whether the SamzaSqlMessage is a system message or not.
    */
-  public String operatorBeginProcessingInstant = null;
+  @JsonIgnore
+  private boolean isSystemMessage = false;
+
+  /**
+   * Time at which the join operation started for the message.
+   * If there is no join node in the operator graph, this will be -1.
+   */
+  public long joinStartTimeMs = -1;
 
 
   /**
@@ -93,7 +102,6 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
 
   public boolean hasArrivalTime() { return arrivalTime != null && !arrivalTime.isEmpty(); }
 
-
   @JsonProperty("scanTime")
   public String getscanTime() { return scanTime;}
 
@@ -103,6 +111,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
 
   public boolean hasScanTime() { return scanTime != null && !scanTime.isEmpty(); }
 
+  @JsonIgnore
+  public  void setIsSystemMessage(boolean isSystemMessage) {
+    this.isSystemMessage = isSystemMessage;
+  }
+
+  @JsonIgnore
+  public boolean isSystemMessage() {
+    return isSystemMessage;
+  }
+
   @Override
   public String toString() {
     return "[Metadata:{" + eventTime + " " + arrivalTime + " " + scanTime + "}]";
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 4883dfb..d521681 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -89,6 +89,7 @@ public class SamzaSqlApplicationConfig {
 
   public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix";
   public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
+  public static final String CFG_SQL_PROCESS_SYSTEM_EVENTS = "samza.sql.processSystemEvents";
 
   public static final String SAMZA_SYSTEM_LOG = "log";
 
@@ -115,6 +116,7 @@ public class SamzaSqlApplicationConfig {
 
   private final String metadataTopicPrefix;
   private final long windowDurationMs;
+  private final boolean processSystemEvents;
 
   public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams,
       List<String> outputSystemStreams) {
@@ -165,6 +167,8 @@ public class SamzaSqlApplicationConfig {
 
     metadataTopicPrefix =
         staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+
+    processSystemEvents = staticConfig.getBoolean(CFG_SQL_PROCESS_SYSTEM_EVENTS, true);
     windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
   }
 
@@ -324,4 +328,8 @@ public class SamzaSqlApplicationConfig {
   public long getWindowDurationMs() {
     return windowDurationMs;
   }
+
+  public boolean isProcessSystemEvents() {
+    return processSystemEvents;
+  }
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 3378788..6e6ff45 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -193,5 +193,4 @@ class ProjectTranslator {
     context.registerMessageStream(project.getId(), outputStream);
     context.registerRelNode(project.getId(), project);
   }
-
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index d3c8fa9..ce4737c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -50,6 +50,7 @@ import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -293,6 +294,17 @@ public class QueryTranslator {
       GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
       OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
       outputStream.sendTo(stm);
+
+      // Process system events only if the output is a stream.
+      if (sqlConfig.isProcessSystemEvents()) {
+        for( MessageStream<SamzaSqlInputMessage> inputStream :  inputMsgStreams.values()) {
+          MessageStream<KV<Object, Object>> systemEventStream =
+              inputStream.filter(message -> message.getMetadata().isSystemMessage())
+                  .map(SamzaSqlInputMessage::getKeyAndMessageKV);
+
+          systemEventStream.sendTo(stm);
+        }
+      }
     } else {
       Table outputTable = appDesc.getTable(tableDescriptor.get());
       if (outputTable == null) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index e044f6f..5fa04d8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -1,21 +1,21 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.sql.translator;
 
@@ -36,8 +36,8 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.SamzaSqlInputTransformer;
 import org.apache.samza.sql.SamzaSqlInputMessage;
+import org.apache.samza.sql.SamzaSqlInputTransformer;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
@@ -48,6 +48,7 @@ import org.apache.samza.system.descriptors.InputTransformer;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 
+
 /**
  * Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
  * implementation
@@ -78,7 +79,7 @@ class ScanTranslator {
 
     @Override
     public boolean apply(SamzaSqlInputMessage samzaSqlInputMessage) {
-      return !relConverter.isSystemMessage(samzaSqlInputMessage.getKeyAndMessageKV());
+      return !samzaSqlInputMessage.getMetadata().isSystemMessage();
     }
   }
 
@@ -147,11 +148,11 @@ class ScanTranslator {
       queryInputEvents.inc();
       processingTime.update(Duration.between(startProcessing, endProcessing).toMillis());
     }
-
   } // ScanMapFunction
 
-  void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId, final TranslatorContext context,
-      Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
+  void translate(final TableScan tableScan, final String queryLogicalId, final String logicalOpId,
+      final TranslatorContext context, Map<String, DelegatingSystemDescriptor> systemDescriptors,
+      Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
     StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
     String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
@@ -162,9 +163,9 @@ class ScanTranslator {
     final String streamId = sqlIOConfig.getStreamId();
     final String source = sqlIOConfig.getSource();
 
-    final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
-        (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
-            sqlIOConfig.getTableDescriptor().get() instanceof CachingTableDescriptor);
+    final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() && (
+        sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor || sqlIOConfig.getTableDescriptor()
+            .get() instanceof CachingTableDescriptor);
 
     // For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
     // SqlIOResolverFactory.
@@ -181,22 +182,55 @@ class ScanTranslator {
       systemDescriptors.put(systemName, systemDescriptor);
     } else {
       /* in SamzaSQL, there should be no systemDescriptor setup by user, so this branch happens only
-      * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
-      * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
+       * in case of Fan-OUT (i.e., same input stream used in multiple sql statements), or when same input
+       * used twice in same sql statement (e.g., select ... from input as i1, input as i2 ...), o.w., throw error */
       if (systemDescriptor.getTransformer().isPresent()) {
         InputTransformer existingTransformer = systemDescriptor.getTransformer().get();
         if (!(existingTransformer instanceof SamzaSqlInputTransformer)) {
-          throw new SamzaException("SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
+          throw new SamzaException(
+              "SamzaSQL Exception: existing transformer for " + systemName + " is not SamzaSqlInputTransformer");
         }
       }
     }
 
     InputDescriptor inputDescriptor = systemDescriptor.getInputDescriptor(streamId, new NoOpSerde<>());
-    MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
-        inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(inputDescriptor))
-            .filter(new FilterSystemMessageFunction(sourceName, queryId))
-            .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));
+
+    if (!inputMsgStreams.containsKey(source)) {
+      MessageStream<SamzaSqlInputMessage> inputMsgStream = streamAppDesc.getInputStream(inputDescriptor);
+      inputMsgStreams.put(source, inputMsgStream.map(new SystemMessageMapperFunction(source, queryId)));
+    }
+    MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputMsgStreams.get(source)
+        .filter(new FilterSystemMessageFunction(sourceName, queryId))
+        .map(new ScanMapFunction(sourceName, queryId, queryLogicalId, logicalOpId));
 
     context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
   }
+
+  /**
+   * Function that populates whether the message is a system message.
+   * TODO This should ideally be populated by the InputTransformer in future.
+   */
+  private static class SystemMessageMapperFunction implements MapFunction<SamzaSqlInputMessage, SamzaSqlInputMessage> {
+    private final String source;
+    private final int queryId;
+    private transient SamzaRelConverter relConverter;
+
+    public SystemMessageMapperFunction(String source, int queryId) {
+      this.source = source;
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void init(Context context) {
+      TranslatorContext translatorContext =
+          ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+      relConverter = translatorContext.getMsgConverter(source);
+    }
+
+    @Override
+    public SamzaSqlInputMessage apply(SamzaSqlInputMessage message) {
+      message.getMetadata().setIsSystemMessage(relConverter.isSystemMessage(message.getKeyAndMessageKV()));
+      return message;
+    }
+  }
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
index bb3300a..ef3028e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.sql.translator;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.time.Instant;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
@@ -60,7 +59,7 @@ class TranslatorInputMetricsMapFunction implements MapFunction<SamzaSqlRelMessag
   @Override
   public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
     inputEvents.inc();
-    message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant = Instant.now().toString();
+    message.getSamzaSqlRelMsgMetadata().joinStartTimeMs = Instant.now().toEpochMilli();
     return message;
   }
 
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
index f1757fb..3e85bed 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
@@ -63,7 +63,7 @@ class TranslatorOutputMetricsMapFunction implements MapFunction<SamzaSqlRelMessa
   @Override
   public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
     Instant endProcessing = Instant.now();
-    Instant beginProcessing = Instant.parse(message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant);
+    Instant beginProcessing = Instant.ofEpochMilli(message.getSamzaSqlRelMsgMetadata().joinStartTimeMs);
     outputEvents.inc();
     processingTime.update(Duration.between(beginProcessing, endProcessing).toMillis());
     return message;
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index e69ae9a..e0264ee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -107,6 +107,29 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
         .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
         .sorted()
         .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+  }
+
+  @Test
+  public void testEndToEndDisableSystemMessages() {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String avroSamzaToRelMsgConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
+    staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        SampleRelConverterFactory.class.getName());
+    String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS, "false");
+    runApplication(new MapConfig(staticConfigs));
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
     Assert.assertEquals((numMessages + 1) / 2, outMessages.size());
   }
 
@@ -174,7 +197,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(numMessages, outMessagesSet.size());
     Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
   }
-  
+
   @Test
   public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
     int numMessages = 20;