You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/12 20:13:19 UTC

samza git commit: SAMZA-1937: Make DSL converter work when sql is a query

Repository: samza
Updated Branches:
  refs/heads/master 954fb25e7 -> e5ea9bef1


SAMZA-1937: Make DSL converter work when sql is a query

## What changes were proposed in this pull request?
To support the use case that users use Samza Sql Console or Samza Sql shell to perform SELECT query.

## How was this patch tested?
1. Build passed and current unit tests passed
2. Testing in Samza Sql shell

Author: Weiqing Yang <ya...@gmail.com>

Reviewers: Aditya Toomula <at...@linkedin.com>

Closes #721 from weiqingy/SAMZA-1937 and squashes the following commits:

a16f717e [Weiqing Yang] Fix review comments
6bbf0b8a [Weiqing Yang] fix review comments
4658a42f [Weiqing Yang] make DSL converter work when sql is a query


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e5ea9bef
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e5ea9bef
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e5ea9bef

Branch: refs/heads/master
Commit: e5ea9bef1d0f363b8c919565ee1919cd0ac248fe
Parents: 954fb25
Author: Weiqing Yang <ya...@gmail.com>
Authored: Fri Oct 12 13:13:12 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Fri Oct 12 13:13:12 2018 -0700

----------------------------------------------------------------------
 .../samza/sql/dsl/SamzaSqlDslConverter.java     | 22 ++++---
 .../sql/runner/SamzaSqlApplicationConfig.java   | 26 +++++++-
 .../samza/sql/translator/QueryTranslator.java   | 62 ++++++++++++++++++++
 .../org/apache/samza/tools/SamzaSqlConsole.java |  5 +-
 4 files changed, 101 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e5ea9bef/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index 4ec6f4a..6cce906 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -37,8 +37,6 @@ import org.apache.samza.sql.testutil.SqlFileParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
-
 
 public class SamzaSqlDslConverter implements DslConverter {
 
@@ -66,6 +64,12 @@ public class SamzaSqlDslConverter implements DslConverter {
 
     List<RelRoot> relRoots = new LinkedList<>();
     for (String sql: sqlStmts) {
+      // when sql is a query, we only pass the select query to the planner
+      SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql);
+      if (qinfo.getSink().split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
+        sql = qinfo.getSelectQuery();
+      }
+
       relRoots.add(planner.plan(sql));
     }
     return relRoots;
@@ -77,13 +81,15 @@ public class SamzaSqlDslConverter implements DslConverter {
 
   public static List<String> fetchSqlFromConfig(Map<String, String> config) {
     List<String> sql;
-    if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) {
-      String sqlValue = config.get(CFG_SQL_STMT);
+    if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMT) &&
+        StringUtils.isNotBlank(config.get(SamzaSqlApplicationConfig.CFG_SQL_STMT))) {
+      String sqlValue = config.get(SamzaSqlApplicationConfig.CFG_SQL_STMT);
       sql = Collections.singletonList(sqlValue);
-    } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) {
-      sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON));
-    } else if (config.containsKey(CFG_SQL_FILE)) {
-      String sqlFile = config.get(CFG_SQL_FILE);
+    } else if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON) &&
+        StringUtils.isNotBlank(config.get(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON))) {
+      sql = SamzaSqlApplicationConfig.deserializeSqlStmts(config.get(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON));
+    } else if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_FILE)) {
+      String sqlFile = config.get(SamzaSqlApplicationConfig.CFG_SQL_FILE);
       sql = SqlFileParser.parseSqlFile(sqlFile);
     } else {
       String msg = "Config doesn't contain the SQL that needs to be executed.";

http://git-wip-us.apache.org/repos/asf/samza/blob/e5ea9bef/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 415cfdd..17df373 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
 import org.apache.samza.sql.interfaces.DslConverter;
@@ -52,6 +53,7 @@ import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.interfaces.UdfResolver;
 import org.apache.samza.sql.testutil.JsonUtil;
 import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,6 +84,8 @@ public class SamzaSqlApplicationConfig {
 
   public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
 
+  public static final String SAMZA_SYSTEM_LOG = "log";
+
   private static final long DEFAULT_GROUPBY_WINDOW_DURATION_MS = 300000; // default groupby window duration is 5 mins.
 
   private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
@@ -130,6 +134,13 @@ public class SamzaSqlApplicationConfig {
     udfMetadata = udfResolver.getUdfs();
 
     windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
+
+    // remove the SqlIOConfigs of outputs whose system is "log" out of systemStreamConfigsBySource
+    outputSystemStreamConfigsBySource.forEach((k, v) -> {
+        if (k.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
+            systemStreamConfigsBySource.remove(k);
+        }
+    });
   }
 
   private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -202,9 +213,18 @@ public class SamzaSqlApplicationConfig {
 
     Collection<RelRoot> relRoots = dslConverter.convertDsl(String.join("\n", dslStmts));
 
-    for (RelRoot relRoot : relRoots) {
-      SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams);
-    }
+    // FIXME: the snippet below dose not work when sql is a query
+    // for (RelRoot relRoot : relRoots) {
+    //   SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams);
+    // }
+
+    // RelRoot does not have sink node (aka. log.outputStream) when Sql statement is a query, so we
+    // can not traverse the tree of relRoot to get "outputSystemStreams"
+    List<String> sqlStmts = SamzaSqlDslConverter.fetchSqlFromConfig(config);
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlDslConverter.fetchQueryInfo(sqlStmts);
+    inputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+          .collect(Collectors.toSet()));
+    outputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
 
     return relRoots;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e5ea9bef/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index b13043f..817f145 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.translator;
 
 import java.util.Map;
+import java.util.Optional;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelShuttleImpl;
@@ -34,14 +35,23 @@ import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.table.Table;
+
 
 /**
  * This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries.
@@ -54,6 +64,27 @@ public class QueryTranslator {
   private final SamzaSqlApplicationConfig sqlConfig;
   private final Map<String, SamzaRelConverter> converters;
 
+  private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
+    private transient SamzaRelConverter samzaMsgConverter;
+    private final String outputTopic;
+
+    OutputMapFunction(String outputTopic) {
+      this.outputTopic = outputTopic;
+    }
+
+    @Override
+    public void init(Context context) {
+      TranslatorContext translatorContext =
+          ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
+      this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic);
+    }
+
+    @Override
+    public KV<Object, Object> apply(SamzaSqlRelMessage message) {
+      return this.samzaMsgConverter.convertToSamzaMessage(message);
+    }
+  }
+
   public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
     this.sqlConfig = sqlConfig;
     scanTranslator =
@@ -136,6 +167,15 @@ public class QueryTranslator {
       }
     });
 
+    // the snippet below will be performed only when sql is a query statement
+    sqlConfig.getOutputSystemStreamConfigsBySource().keySet().forEach(
+        key -> {
+          if (key.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
+            sendToOutputStream(appDesc, translatorContext, node, key);
+          }
+        }
+    );
+
     /*
      * TODO When serialization of ApplicationDescriptor is actually needed, then something will need to be updated here,
      * since translatorContext is not Serializable. Currently, a new ApplicationDescriptor instance is created in each
@@ -148,4 +188,26 @@ public class QueryTranslator {
         applicationContainerContext) ->
         new SamzaSqlApplicationContext(translatorContext.clone()));
   }
+
+  private void sendToOutputStream(StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, String sink) {
+    SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sink);
+    MessageStream<SamzaSqlRelMessage> stream = context.getMessageStream(node.getId());
+    MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sink));
+    Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
+    if (!tableDescriptor.isPresent()) {
+      KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+      String systemName = sinkConfig.getSystemName();
+      DelegatingSystemDescriptor
+          sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
+      GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
+      outputStream.sendTo(appDesc.getOutputStream(osd));
+    } else {
+      Table outputTable = appDesc.getTable(tableDescriptor.get());
+      if (outputTable == null) {
+        String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
+        throw new SamzaException(msg);
+      }
+      outputStream.sendTo(outputTable);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e5ea9bef/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
----------------------------------------------------------------------
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
index bfd217c..bc44470 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java
@@ -67,7 +67,6 @@ public class SamzaSqlConsole {
   private static final String OPT_DESC_SQL_STMT = "SQL statement to execute.";
 
   private static final String SAMZA_SYSTEM_KAFKA = "kafka";
-  private static final String SAMZA_SYSTEM_LOG = "log";
 
   public static void main(String[] args) {
     Options options = new Options();
@@ -152,8 +151,8 @@ public class SamzaSqlConsole {
     staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
     String logSystemConfigPrefix =
-        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG);
-    String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG);
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG);
+    String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG);
     staticConfigs.put(logSystemConfigPrefix + "samza.factory", ConsoleLoggingSystemFactory.class.getName());
     staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json");
     staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");