You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:11 UTC
[12/29] samza git commit: Samza SQL: Code re-org to accomodate Samza
SQL engine to take Calcite IR as input
Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input
This PR has the following changes:
- Let QueryTranslator take Calcite IR as input
- Include 'INSERT INTO' sql statement for Calcite plan
- Basic DSLConverter Framework with SamzaSQL dialect as an example
- Some fixes to stream-table join wrt Serde
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srinivasulu <sp...@linkedin.com>, Weiqing <wi...@linkedin.com>
Closes #630 from atoomula/dsl3 and squashes the following commits:
93c66cee [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
21c0175b [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
15a1e9fb [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
5bf0c7e1 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
98cd9777 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
63a66fb1 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
6794b512 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
c9d434a9 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
94e53b64 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
30c76ed9 [Aditya Toomula] Samza SQL: Code re-org to accomodate Samza SQL engine to take Calcite IR as input.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dec16392
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dec16392
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dec16392
Branch: refs/heads/NewKafkaSystemConsumer
Commit: dec16392de2f5d323b6b1b3acf8de1689038f44d
Parents: db6996e
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Sep 20 14:22:38 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Thu Sep 20 14:22:38 2018 -0700
----------------------------------------------------------------------
.../samza/sql/data/RexToJavaCompiler.java | 5 +-
.../samza/sql/dsl/SamzaSqlDslConverter.java | 96 ++++++
.../sql/dsl/SamzaSqlDslConverterFactory.java | 33 ++
.../samza/sql/interfaces/DslConverter.java | 37 ++
.../sql/interfaces/DslConverterFactory.java | 36 ++
.../samza/sql/interfaces/SamzaSqlDriver.java | 56 +++
.../interfaces/SamzaSqlJavaTypeFactoryImpl.java | 72 ++++
.../samza/sql/runner/SamzaSqlApplication.java | 30 +-
.../sql/runner/SamzaSqlApplicationConfig.java | 117 ++++---
.../sql/runner/SamzaSqlApplicationRunner.java | 41 ++-
.../samza/sql/testutil/SamzaSqlQueryParser.java | 21 +-
.../samza/sql/translator/JoinTranslator.java | 1 +
.../samza/sql/translator/ModifyTranslator.java | 117 +++++++
.../samza/sql/translator/QueryTranslator.java | 90 ++---
.../samza/sql/translator/ScanTranslator.java | 10 +-
.../apache/samza/sql/e2e/TestSamzaSqlTable.java | 4 +-
.../runner/TestSamzaSqlApplicationConfig.java | 49 ++-
.../runner/TestSamzaSqlApplicationRunner.java | 2 +-
.../samza/sql/system/TestAvroSystemFactory.java | 3 +-
.../samza/sql/testutil/SamzaSqlTestConfig.java | 3 +
.../sql/testutil/TestSamzaSqlFileParser.java | 1 +
.../sql/translator/TestQueryTranslator.java | 345 +++++++++++++------
.../test/samzasql/TestSamzaSqlEndToEnd.java | 64 +++-
23 files changed, 948 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
index 21c81a8..1cfa95f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -49,6 +49,7 @@ import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.util.Pair;
import org.apache.samza.SamzaException;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.commons.compiler.CompilerFactoryFactory;
import org.codehaus.commons.compiler.IClassBodyEvaluator;
@@ -114,11 +115,11 @@ public class RexToJavaCompiler {
final ParameterExpression root = DataContext.ROOT;
final ParameterExpression inputValues = Expressions.parameter(Object[].class, "inputValues");
final ParameterExpression outputValues = Expressions.parameter(Object[].class, "outputValues");
- final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+ final JavaTypeFactoryImpl javaTypeFactory = new SamzaSqlJavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
// public void execute(Object[] inputValues, Object[] outputValues)
final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl(ImmutableList.of(
- Pair.<org.apache.calcite.linq4j.tree.Expression, PhysType>of(
+ Pair.of(
Expressions.variable(Object[].class, "inputValues"),
PhysTypeImpl.of(javaTypeFactory, inputRowType, JavaRowFormat.ARRAY, false))));
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
new file mode 100644
index 0000000..4ec6f4a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.dsl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.planner.QueryPlanner;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.testutil.SqlFileParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
+
+
+public class SamzaSqlDslConverter implements DslConverter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlDslConverter.class);
+
+ private final Config config;
+
+ SamzaSqlDslConverter(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Collection<RelRoot> convertDsl(String dsl) {
+ // TODO: Introduce an API to parse a dsl string and return one or more sql statements
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toSet()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
+ QueryPlanner planner =
+ new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
+ sqlConfig.getUdfMetadata());
+
+ List<RelRoot> relRoots = new LinkedList<>();
+ for (String sql: sqlStmts) {
+ relRoots.add(planner.plan(sql));
+ }
+ return relRoots;
+ }
+
+ public static List<SamzaSqlQueryParser.QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
+ return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
+ }
+
+ public static List<String> fetchSqlFromConfig(Map<String, String> config) {
+ List<String> sql;
+ if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) {
+ String sqlValue = config.get(CFG_SQL_STMT);
+ sql = Collections.singletonList(sqlValue);
+ } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) {
+ sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON));
+ } else if (config.containsKey(CFG_SQL_FILE)) {
+ String sqlFile = config.get(CFG_SQL_FILE);
+ sql = SqlFileParser.parseSqlFile(sqlFile);
+ } else {
+ String msg = "Config doesn't contain the SQL that needs to be executed.";
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+
+ return sql;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java
new file mode 100644
index 0000000..5176453
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverterFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.dsl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.interfaces.DslConverterFactory;
+
+
+public class SamzaSqlDslConverterFactory implements DslConverterFactory {
+
+ @Override
+ public DslConverter create(Config config) {
+ return new SamzaSqlDslConverter(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
new file mode 100644
index 0000000..fc2ca8e
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.util.Collection;
+import org.apache.calcite.rel.RelRoot;
+
+
+/**
+ * Samza SQL Application uses {@link DslConverter} to convert the input dsl to Calcite logical plan.
+ */
+public interface DslConverter {
+
+ /**
+ * Convert the dsl into the Calcite logical plan.
+ * @return List of Root nodes of the Calcite logical plan.
+ * If DSL represents multiple SQL statements. You might return root nodes one for each SQL statement.
+ */
+ Collection<RelRoot> convertDsl(String dsl);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java
new file mode 100644
index 0000000..d42a96f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverterFactory.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory that is used to create {@link DslConverter}
+ */
+public interface DslConverterFactory {
+
+ /**
+ * Create a {@link DslConverter} given the config
+ * @param config config needed to create the {@link DslConverter}
+ * @return {@link DslConverter} object created.
+ */
+ DslConverter create(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java
new file mode 100644
index 0000000..5c86df9
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlDriver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.sql.interfaces;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.ConnectStringParser;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.jdbc.CalciteFactory;
+import org.apache.calcite.jdbc.Driver;
+
+
+/**
+ * Calcite JDBC driver for SamzaSQL which takes in a {@link JavaTypeFactory}
+ */
+public class SamzaSqlDriver extends Driver {
+
+ private JavaTypeFactory typeFactory;
+
+ public SamzaSqlDriver(JavaTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ @Override
+ public Connection connect(String url, Properties info) throws SQLException {
+ if (!acceptsURL(url)) {
+ return null;
+ }
+ final String prefix = getConnectStringPrefix();
+ assert url.startsWith(prefix);
+ final String urlSuffix = url.substring(prefix.length());
+ final Properties info2 = ConnectStringParser.parse(urlSuffix, info);
+ final AvaticaConnection connection =
+ ((CalciteFactory) factory).newConnection(this, factory, url, info2, null, typeFactory);
+ handler.onConnectionInit(connection);
+ return connection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
new file mode 100644
index 0000000..50001c6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.sql.interfaces;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+
+/**
+ * Calcite does validation of projected field types in select statement with the output schema types. If one of the
+ * projected fields is an UDF with return type of {@link Object} or any other java type not defined in
+ * {@link JavaToSqlTypeConversionRules}, using the default {@link JavaTypeFactoryImpl} results in validation failure.
+ * Hence, extending {@link JavaTypeFactoryImpl} to make Calcite validation work with all output types of Samza SQL UDFs.
+ */
+public class SamzaSqlJavaTypeFactoryImpl
+ extends JavaTypeFactoryImpl {
+
+ public SamzaSqlJavaTypeFactoryImpl() {
+ this(RelDataTypeSystem.DEFAULT);
+ }
+
+ public SamzaSqlJavaTypeFactoryImpl(RelDataTypeSystem typeSystem) {
+ super(typeSystem);
+ }
+
+ @Override
+ public RelDataType toSql(RelDataType type) {
+ return toSql(this, type);
+ }
+
+ /** Converts a type in Java format to a SQL-oriented type. */
+ public static RelDataType toSql(final RelDataTypeFactory typeFactory,
+ RelDataType type) {
+ if (type instanceof RelRecordType) {
+ return typeFactory.createStructType(
+ Lists.transform(type.getFieldList(), a0 -> toSql(typeFactory, a0.getType())),
+ type.getFieldNames());
+ }
+ if (type instanceof JavaType) {
+ SqlTypeName typeName = JavaToSqlTypeConversionRules.instance().lookup(((JavaType) type).getJavaClass());
+ // For unknown sql type names, return ANY sql type to make Calcite validation not fail.
+ if (typeName == null) {
+ typeName = SqlTypeName.ANY;
+ }
+ return typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(typeName),
+ type.isNullable());
+ } else {
+ return JavaTypeFactoryImpl.toSql(typeFactory, type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 9a871d7..fd1a2a8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -19,10 +19,14 @@
package org.apache.samza.sql.runner;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import org.apache.calcite.rel.RelRoot;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.translator.QueryTranslator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,12 +42,26 @@ public class SamzaSqlApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDesc) {
try {
- SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(appDesc.getConfig());
+ // TODO: Introduce an API to return a dsl string containing one or more sql statements.
+ List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(appDesc.getConfig());
+
+ // 1. Get Calcite plan
+ Set<String> inputSystemStreams = new HashSet<>();
+ Set<String> outputSystemStreams = new HashSet<>();
+
+ Collection<RelRoot> relRoots =
+ SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDesc.getConfig(),
+ inputSystemStreams, outputSystemStreams);
+
+ // 2. Populate configs
+ SamzaSqlApplicationConfig sqlConfig =
+ new SamzaSqlApplicationConfig(appDesc.getConfig(), inputSystemStreams, outputSystemStreams);
+
+ // 3. Translate Calcite plan to Samza stream operators
QueryTranslator queryTranslator = new QueryTranslator(sqlConfig);
- List<SamzaSqlQueryParser.QueryInfo> queries = sqlConfig.getQueryInfo();
- for (SamzaSqlQueryParser.QueryInfo query : queries) {
- LOG.info("Translating the query {} to samza stream graph", query.getSelectQuery());
- queryTranslator.translate(query, appDesc);
+ for (RelRoot relRoot : relRoots) {
+ LOG.info("Translating relRoot {} to samza stream graph", relRoot);
+ queryTranslator.translate(relRoot, appDesc);
}
} catch (RuntimeException e) {
LOG.error("SamzaSqlApplication threw exception.", e);
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 997312f..415cfdd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -20,7 +20,6 @@
package org.apache.samza.sql.runner;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -30,12 +29,18 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.core.TableModify;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.interfaces.DslConverterFactory;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -47,9 +52,6 @@ import org.apache.samza.sql.interfaces.UdfMetadata;
import org.apache.samza.sql.interfaces.UdfResolver;
import org.apache.samza.sql.testutil.JsonUtil;
import org.apache.samza.sql.testutil.ReflectionUtils;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
-import org.apache.samza.sql.testutil.SqlFileParser;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,37 +94,25 @@ public class SamzaSqlApplicationConfig {
private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource;
private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
-
- private final List<String> sql;
-
- private final List<QueryInfo> queryInfo;
+ private final Map<String, SqlIOConfig> systemStreamConfigsBySource;
private final long windowDurationMs;
- public SamzaSqlApplicationConfig(Config staticConfig) {
-
- sql = fetchSqlFromConfig(staticConfig);
-
- queryInfo = fetchQueryInfo(sql);
+ public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams,
+ Set<String> outputSystemStreams) {
ioResolver = createIOResolver(staticConfig);
- udfResolver = createUdfResolver(staticConfig);
- udfMetadata = udfResolver.getUdfs();
+ inputSystemStreamConfigBySource = inputSystemStreams.stream()
+ .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src)));
- inputSystemStreamConfigBySource = queryInfo.stream()
- .map(QueryInfo::getSources)
- .flatMap(Collection::stream)
- .distinct()
- .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSourceInfo));
+ outputSystemStreamConfigsBySource = outputSystemStreams.stream()
+ .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x)));
- Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
+ systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource);
+ systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource);
- outputSystemStreamConfigsBySource = queryInfo.stream()
- .map(QueryInfo::getSink)
- .distinct()
- .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSinkInfo));
- systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
+ Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(systemStreamConfigsBySource.values());
relSchemaProvidersBySource = systemStreamConfigs.stream()
.collect(Collectors.toMap(SqlIOConfig::getSource,
@@ -136,6 +126,9 @@ public class SamzaSqlApplicationConfig {
CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
relSchemaProvidersBySource.get(x.getSource()), c))));
+ udfResolver = createUdfResolver(staticConfig);
+ udfMetadata = udfResolver.getUdfs();
+
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
}
@@ -151,30 +144,7 @@ public class SamzaSqlApplicationConfig {
return factoryInvoker.apply(factory, pluginConfig);
}
- public static List<QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
- return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
- }
-
- public static List<String> fetchSqlFromConfig(Map<String, String> config) {
- List<String> sql;
- if (config.containsKey(CFG_SQL_STMT) && StringUtils.isNotBlank(config.get(CFG_SQL_STMT))) {
- String sqlValue = config.get(CFG_SQL_STMT);
- sql = Collections.singletonList(sqlValue);
- } else if (config.containsKey(CFG_SQL_STMTS_JSON) && StringUtils.isNotBlank(config.get(CFG_SQL_STMTS_JSON))) {
- sql = deserializeSqlStmts(config.get(CFG_SQL_STMTS_JSON));
- } else if (config.containsKey(CFG_SQL_FILE)) {
- String sqlFile = config.get(CFG_SQL_FILE);
- sql = SqlFileParser.parseSqlFile(sqlFile);
- } else {
- String msg = "Config doesn't contain the SQL that needs to be executed.";
- LOG.error(msg);
- throw new SamzaException(msg);
- }
-
- return sql;
- }
-
- private static List<String> deserializeSqlStmts(String value) {
+ public static List<String> deserializeSqlStmts(String value) {
Validate.notEmpty(value, "json Value is not set or empty");
return JsonUtil.fromJson(value, new TypeReference<List<String>>() {
});
@@ -224,12 +194,45 @@ public class SamzaSqlApplicationConfig {
return ret;
}
- public List<String> getSql() {
- return sql;
+ public static Collection<RelRoot> populateSystemStreamsAndGetRelRoots(List<String> dslStmts, Config config,
+ Set<String> inputSystemStreams, Set<String> outputSystemStreams) {
+ // TODO: Get the converter factory based on the file type. Create abstraction around this.
+ DslConverterFactory dslConverterFactory = new SamzaSqlDslConverterFactory();
+ DslConverter dslConverter = dslConverterFactory.create(config);
+
+ Collection<RelRoot> relRoots = dslConverter.convertDsl(String.join("\n", dslStmts));
+
+ for (RelRoot relRoot : relRoots) {
+ SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams);
+ }
+
+ return relRoots;
+ }
+
+ private static void populateSystemStreams(RelNode relNode, Set<String> inputSystemStreams,
+ Set<String> outputSystemStreams) {
+ if (relNode instanceof TableModify) {
+ outputSystemStreams.add(getSystemStreamName(relNode));
+ } else {
+ if (relNode instanceof BiRel) {
+ BiRel biRelNode = (BiRel) relNode;
+ populateSystemStreams(biRelNode.getLeft(), inputSystemStreams, outputSystemStreams);
+ populateSystemStreams(biRelNode.getRight(), inputSystemStreams, outputSystemStreams);
+ } else {
+ if (relNode.getTable() != null) {
+ inputSystemStreams.add(getSystemStreamName(relNode));
+ }
+ }
+ }
+ List<RelNode> relNodes = relNode.getInputs();
+ if (relNodes == null || relNodes.isEmpty()) {
+ return;
+ }
+ relNodes.forEach(node -> populateSystemStreams(node, inputSystemStreams, outputSystemStreams));
}
- public List<QueryInfo> getQueryInfo() {
- return queryInfo;
+ private static String getSystemStreamName(RelNode relNode) {
+ return relNode.getTable().getQualifiedName().stream().map(Object::toString).collect(Collectors.joining("."));
}
public Collection<UdfMetadata> getUdfMetadata() {
@@ -244,6 +247,10 @@ public class SamzaSqlApplicationConfig {
return outputSystemStreamConfigsBySource;
}
+ public Map<String, SqlIOConfig> getSystemStreamConfigsBySource() {
+ return systemStreamConfigsBySource;
+ }
+
public Map<String, SamzaRelConverter> getSamzaRelConverters() {
return samzaRelConvertersBySource;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 027fd23..cad032f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -21,20 +21,21 @@ package org.apache.samza.sql.runner;
import java.time.Duration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.Validate;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
-import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,25 +64,31 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
public static Config computeSamzaConfigs(Boolean localRunner, Config config) {
Map<String, String> newConfig = new HashMap<>();
- SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(config);
- // Parse the sql and find the input stream streams
- List<String> sqlStmts = SamzaSqlApplicationConfig.fetchSqlFromConfig(config);
+ // TODO: Introduce an API to return a dsl string containing one or more sql statements
+ List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(config);
// This is needed because the SQL file may not be available in all the node managers.
- String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(sqlStmts);
+ String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(dslStmts);
newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, sqlJson);
- List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlApplicationConfig.fetchQueryInfo(sqlStmts);
- for (SamzaSqlQueryParser.QueryInfo query : queryInfo) {
- // Populate stream to system mapping config for input and output system streams
- for (String inputSource : query.getSources()) {
- SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(inputSource);
- newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
- inputSystemStreamConfig.getSystemName());
- newConfig.putAll(inputSystemStreamConfig.getConfig());
- }
-
- SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(query.getSink());
+ Set<String> inputSystemStreams = new HashSet<>();
+ Set<String> outputSystemStreams = new HashSet<>();
+
+ SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, config,
+ inputSystemStreams, outputSystemStreams);
+
+ SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(config);
+
+ // Populate stream to system mapping config for input and output system streams
+ for (String source : inputSystemStreams) {
+ SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(source);
+ newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
+ inputSystemStreamConfig.getSystemName());
+ newConfig.putAll(inputSystemStreamConfig.getConfig());
+ }
+
+ for (String sink : outputSystemStreams) {
+ SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(sink);
newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
outputSystemStreamConfig.getSystemName());
newConfig.putAll(outputSystemStreamConfig.getConfig());
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
index 39ea092..643c82f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
@@ -24,9 +24,11 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.plan.Contexts;
@@ -49,6 +51,8 @@ import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.samza.SamzaException;
+import org.apache.samza.sql.interfaces.SamzaSqlDriver;
+import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
/**
@@ -63,11 +67,13 @@ public class SamzaSqlQueryParser {
private final List<String> sources;
private String selectQuery;
private String sink;
+ private String sql;
- public QueryInfo(String selectQuery, List<String> sources, String sink) {
+ public QueryInfo(String selectQuery, List<String> sources, String sink, String sql) {
this.selectQuery = selectQuery;
this.sink = sink;
this.sources = sources;
+ this.sql = sql;
}
public List<String> getSources() {
@@ -81,6 +87,10 @@ public class SamzaSqlQueryParser {
public String getSink() {
return sink;
}
+
+ public String getSql() {
+ return sql;
+ }
}
public static QueryInfo parseQuery(String sql) {
@@ -116,14 +126,18 @@ public class SamzaSqlQueryParser {
throw new SamzaException("Sql query is not of the expected format");
}
- return new QueryInfo(selectQuery, sources, sink);
+ return new QueryInfo(selectQuery, sources, sink, sql);
}
private static Planner createPlanner() {
Connection connection;
SchemaPlus rootSchema;
try {
- connection = DriverManager.getConnection("jdbc:calcite:");
+ JavaTypeFactory typeFactory = new SamzaSqlJavaTypeFactoryImpl();
+ SamzaSqlDriver driver = new SamzaSqlDriver(typeFactory);
+ DriverManager.deregisterDriver(DriverManager.getDriver("jdbc:calcite:"));
+ DriverManager.registerDriver(driver);
+ connection = driver.connect("jdbc:calcite:", new Properties());
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
rootSchema = calciteConnection.getRootSchema();
} catch (SQLException e) {
@@ -174,7 +188,6 @@ public class SamzaSqlQueryParser {
getSource(basicCall.operand(0), sourceList);
} else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
sourceList.addAll(getSourcesFromSelectQuery(basicCall.operand(0)));
- return;
}
} else if (node instanceof SqlSelect) {
getSource(((SqlSelect) node).getFrom(), sourceList);
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 7071b39..ac2c64d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -127,6 +127,7 @@ class JoinTranslator {
"stream_" + joinId)
.map(KV::getValue)
.join(table, joinFn);
+ // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn);
context.registerMessageStream(join.getId(), outputStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
new file mode 100644
index 0000000..965338f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -0,0 +1,117 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Translator to translate the TableModify in relational graph to the corresponding output streams in the StreamGraph
+ * implementation
+ */
+class ModifyTranslator {
+
+ private final Map<String, SamzaRelConverter> relMsgConverters;
+ private final Map<String, SqlIOConfig> systemStreamConfig;
+
+ ModifyTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc) {
+ relMsgConverters = converters;
+ this.systemStreamConfig = ssc;
+ }
+
+ // OutputMapFunction converts SamzaSqlRelMessage to SamzaMessage in KV format
+ private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
+ // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
+ // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
+ // initialization.
+ private transient SamzaRelConverter samzaMsgConverter;
+ private final String outputTopic;
+
+ OutputMapFunction(String outputTopic) {
+ this.outputTopic = outputTopic;
+ }
+
+ @Override
+ public void init(Config config, TaskContext taskContext) {
+ TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
+ this.samzaMsgConverter = context.getMsgConverter(outputTopic);
+ }
+
+ @Override
+ public KV<Object, Object> apply(SamzaSqlRelMessage message) {
+ return this.samzaMsgConverter.convertToSamzaMessage(message);
+ }
+ }
+
+ void translate(final TableModify tableModify, final TranslatorContext context) {
+ StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
+ List<String> tableNameParts = tableModify.getTable().getQualifiedName();
+ String targetName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
+
+ Validate.isTrue(relMsgConverters.containsKey(targetName), String.format("Unknown source %s", targetName));
+
+ SqlIOConfig sinkConfig = systemStreamConfig.get(targetName);
+
+ final String systemName = sinkConfig.getSystemName();
+ final String streamName = sinkConfig.getStreamName();
+
+ KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+ DelegatingSystemDescriptor
+ sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
+ GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamName, noOpKVSerde);
+
+ MessageStreamImpl<SamzaSqlRelMessage> stream =
+ (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId());
+ MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(targetName));
+
+ Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
+ if (!tableDescriptor.isPresent()) {
+ outputStream.sendTo(streamAppDesc.getOutputStream(osd));
+ } else {
+ Table outputTable = streamAppDesc.getTable(tableDescriptor.get());
+ if (outputTable == null) {
+ String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
+ throw new SamzaException(msg);
+ }
+ outputStream.sendTo(outputTable);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index fe4d8da..3a35b97 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -20,10 +20,10 @@
package org.apache.samza.sql.translator;
import java.util.Map;
-import java.util.Optional;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
@@ -33,27 +33,13 @@ import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;;
-import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.table.Table;
import org.apache.samza.task.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -62,54 +48,56 @@ import org.slf4j.LoggerFactory;
* It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly.
*/
public class QueryTranslator {
- private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
-
private final ScanTranslator scanTranslator;
+ private final ModifyTranslator modifyTranslator;
private final SamzaSqlApplicationConfig sqlConfig;
private final Map<String, SamzaRelConverter> converters;
- private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
- private transient SamzaRelConverter samzaMsgConverter;
- private final String outputTopic;
-
- OutputMapFunction(String outputTopic) {
- this.outputTopic = outputTopic;
- }
-
- @Override
- public void init(Config config, TaskContext taskContext) {
- TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
- this.samzaMsgConverter = context.getMsgConverter(outputTopic);
- }
-
- @Override
- public KV<Object, Object> apply(SamzaSqlRelMessage message) {
- return this.samzaMsgConverter.convertToSamzaMessage(message);
- }
- }
-
public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
this.sqlConfig = sqlConfig;
scanTranslator =
new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource());
+ modifyTranslator =
+ new ModifyTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getOutputSystemStreamConfigsBySource());
this.converters = sqlConfig.getSamzaRelConverters();
}
public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) {
QueryPlanner planner =
- new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
+ new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
sqlConfig.getUdfMetadata());
+ final RelRoot relRoot = planner.plan(queryInfo.getSql());
+ translate(relRoot, appDesc);
+ }
+
+ public void translate(RelRoot relRoot, StreamApplicationDescriptor appDesc) {
final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
- final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
- final RelNode node = relRoot.project();
final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
+ final RelNode node = relRoot.project();
node.accept(new RelShuttleImpl() {
int windowId = 0;
int joinId = 0;
@Override
+ public RelNode visit(RelNode relNode) {
+ if (relNode instanceof TableModify) {
+ return visit((TableModify) relNode);
+ }
+ return super.visit(relNode);
+ }
+
+ private RelNode visit(TableModify modify) {
+ if (!modify.isInsert()) {
+ throw new SamzaException("Not a supported operation: " + modify.toString());
+ }
+ RelNode node = super.visit(modify);
+ modifyTranslator.translate(modify, context);
+ return node;
+ }
+
+ @Override
public RelNode visit(TableScan scan) {
RelNode node = super.visit(scan);
scanTranslator.translate(scan, context);
@@ -147,28 +135,6 @@ public class QueryTranslator {
}
});
- String sink = queryInfo.getSink();
- SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sink);
- MessageStreamImpl<SamzaSqlRelMessage> stream = (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
- MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sink));
-
- Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
- if (!tableDescriptor.isPresent()) {
- KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
- String systemName = sinkConfig.getSystemName();
- DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
- GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
- outputStream.sendTo(appDesc.getOutputStream(osd));
- } else {
- Table outputTable = appDesc.getTable(tableDescriptor.get());
- if (outputTable == null) {
- String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
- LOG.error(msg);
- throw new SamzaException(msg);
- }
- outputStream.sendTo(outputTable);
- }
-
appDesc.withContextManager(new ContextManager() {
@Override
public void init(Config config, TaskContext taskContext) {
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 2dc28be..771a5d5 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -27,15 +27,15 @@ import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.task.TaskContext;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
/**
@@ -53,6 +53,9 @@ class ScanTranslator {
}
private static class ScanMapFunction implements MapFunction<KV<Object, Object>, SamzaSqlRelMessage> {
+ // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
+ // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
+ // initialization.
private transient SamzaRelConverter msgConverter;
private final String streamName;
@@ -83,7 +86,8 @@ class ScanTranslator {
final String streamName = sqlIOConfig.getStreamName();
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
- DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
+ DelegatingSystemDescriptor
+ sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde);
MessageStream<KV<Object, Object>> inputStream = streamAppDesc.getInputStream(isd);
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName));
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
index cc339f1..2005c21 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -42,7 +42,7 @@ public class TestSamzaSqlTable {
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testDb.testTable.`$table` select id, name from testavro.SIMPLE1";
+ String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
@@ -58,7 +58,7 @@ public class TestSamzaSqlTable {
TestIOResolverFactory.TestTable.records.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
- String sql1 = "Insert into testDb.testTable.`$table` select id __key__, name from testavro.SIMPLE1";
+ String sql1 = "Insert into testDb.testTable.`$table`(id,name) select id __key__, name from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index dda0e14..46c0651 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -20,19 +20,25 @@
package org.apache.samza.sql.runner;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
+
public class TestSamzaSqlApplicationConfig {
@@ -42,8 +48,14 @@ public class TestSamzaSqlApplicationConfig {
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1");
String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
int numUdfs = config.get(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES).split(",").length;
- SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
- Assert.assertEquals(1, samzaSqlApplicationConfig.getQueryInfo().size());
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toSet()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size());
Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
Assert.assertEquals(1, samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().size());
@@ -54,17 +66,23 @@ public class TestSamzaSqlApplicationConfig {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
-
try {
// Fail because no SQL config
- new SamzaSqlApplicationConfig(new MapConfig(config));
+ fetchSqlFromConfig(config);
Assert.fail();
} catch (SamzaException e) {
}
// Pass
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, "Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1");
- new SamzaSqlApplicationConfig(new MapConfig(config));
+
+ List<String> sqlStmts = fetchSqlFromConfig(config);
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toSet()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+
testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
@@ -85,7 +103,12 @@ public class TestSamzaSqlApplicationConfig {
"insert into testavro.Profile select * from testavro.SIMPLE1");
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
- SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toSet()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
@@ -99,14 +122,24 @@ public class TestSamzaSqlApplicationConfig {
private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {
Map<String, String> badConfigs = new HashMap<>(config);
badConfigs.remove(configKey);
- new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+ List<String> sqlStmts = fetchSqlFromConfig(badConfigs);
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toSet()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
}
private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) {
Map<String, String> badConfigs = new HashMap<>(config);
badConfigs.remove(configKey);
try {
- new SamzaSqlApplicationConfig(new MapConfig(badConfigs));
+ List<String> sqlStmts = fetchSqlFromConfig(badConfigs);
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toSet()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
Assert.fail();
} catch (IllegalArgumentException e) {
// swallow
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index 9fab5d5..1ac804e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -36,7 +36,7 @@ public class TestSamzaSqlApplicationRunner {
@Test
public void testComputeSamzaConfigs() {
Map<String, String> configs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
- String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1";
+ String sql1 = "Insert into testavro.outputTopic(id,long_value) select id, MyTest(id) as long_value from testavro.SIMPLE1";
configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1);
configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, SamzaSqlApplicationRunner.class.getName());
MapConfig samzaConfig = new MapConfig(configs);
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 676781c..458196f 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -31,7 +31,6 @@ import java.util.stream.IntStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.apache.calcite.avatica.util.ByteString;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.sql.avro.schemas.AddressRecord;
@@ -50,7 +49,6 @@ import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +71,7 @@ public class TestAvroSystemFactory implements SystemFactory {
public static final byte[] DEFAULT_TRACKING_ID_BYTES =
{76, 75, -24, 10, 33, -117, 24, -52, -110, -39, -5, 102, 65, 57, -62, -1};
+
public static List<OutgoingMessageEnvelope> messages = new ArrayList<>();
public static List<String> getPageKeyProfileNameJoin(int numMessages) {
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 14e2243..a96fd08 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -128,6 +128,9 @@ public class SamzaSqlTestConfig {
"testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro", "simpleOutputTopic"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
"testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString());
staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
http://git-wip-us.apache.org/repos/asf/samza/blob/dec16392/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
index 1723e0e..a84f347 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
+import org.apache.samza.sql.testutil.SqlFileParser;
import org.junit.Assert;
import org.junit.Test;