You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/03/22 00:58:17 UTC
[3/3] samza git commit: Add stream-table join support for samza sql
Add stream-table join support for samza sql
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Yi Pan <ni...@gmail.com>
Closes #425 from atoomula/join
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/956cf412
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/956cf412
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/956cf412
Branch: refs/heads/master
Commit: 956cf412a44812d54286fd9a0c4d167239387362
Parents: 2d7b0f5
Author: Aditya Toomula <at...@linkedin.com>
Authored: Wed Mar 21 17:58:07 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Mar 21 17:58:07 2018 -0700
----------------------------------------------------------------------
build.gradle | 5 +-
.../samza/runtime/LocalApplicationRunner.java | 1 +
.../org/apache/samza/config/StreamConfig.scala | 1 +
.../samza/sql/data/SamzaSqlCompositeKey.java | 81 ++++
.../sql/data/SamzaSqlExecutionContext.java | 4 +
.../samza/sql/data/SamzaSqlRelMessage.java | 29 +-
.../impl/ConfigBasedSourceResolverFactory.java | 50 ++-
.../samza/sql/interfaces/SourceResolver.java | 14 +-
.../sql/interfaces/SqlSystemSourceConfig.java | 129 +++++++
.../sql/interfaces/SqlSystemStreamConfig.java | 117 ------
.../apache/samza/sql/planner/QueryPlanner.java | 12 +-
.../sql/runner/SamzaSqlApplicationConfig.java | 21 +-
.../sql/runner/SamzaSqlApplicationRunner.java | 6 +-
.../samza/sql/testutil/SamzaSqlQueryParser.java | 31 +-
.../samza/sql/translator/FilterTranslator.java | 20 +-
.../samza/sql/translator/JoinTranslator.java | 279 ++++++++++++++
.../samza/sql/translator/ProjectTranslator.java | 4 +-
.../samza/sql/translator/QueryTranslator.java | 16 +-
.../SamzaSqlRelMessageJoinFunction.java | 121 ++++++
.../samza/sql/translator/ScanTranslator.java | 12 +-
.../apache/samza/sql/TestQueryTranslator.java | 370 +++++++++++++++++-
.../sql/TestSamzaSqlApplicationConfig.java | 8 +-
.../samza/sql/TestSamzaSqlQueryParser.java | 24 +-
.../sql/TestSamzaSqlRelMessageJoinFunction.java | 116 ++++++
.../samza/sql/TestSamzaSqlRelMessageSerde.java | 43 +++
.../apache/samza/sql/avro/schemas/Company.avsc | 39 ++
.../apache/samza/sql/avro/schemas/Company.java | 52 +++
.../sql/avro/schemas/EnrichedPageView.avsc | 45 +++
.../sql/avro/schemas/EnrichedPageView.java | 56 +++
.../apache/samza/sql/avro/schemas/PageView.avsc | 39 ++
.../apache/samza/sql/avro/schemas/PageView.java | 52 +++
.../apache/samza/sql/avro/schemas/Profile.avsc | 45 +++
.../apache/samza/sql/avro/schemas/Profile.java | 56 +++
.../samza/sql/e2e/TestSamzaSqlEndToEnd.java | 153 --------
.../samza/sql/system/SimpleSystemAdmin.java | 11 +-
.../samza/sql/system/TestAvroSystemFactory.java | 95 ++++-
.../samza/sql/testutil/SamzaSqlTestConfig.java | 55 ++-
.../sql/testutil/TestSourceResolverFactory.java | 26 +-
samza-sql/src/test/resources/log4j.xml | 6 +
.../test/samzasql/TestSamzaSqlEndToEnd.java | 385 +++++++++++++++++++
.../org/apache/samza/tools/SamzaSqlConsole.java | 10 +-
41 files changed, 2256 insertions(+), 383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 44a6ccd..d96ec96 100644
--- a/build.gradle
+++ b/build.gradle
@@ -316,11 +316,12 @@ project(':samza-sql') {
dependencies {
compile project(':samza-api')
compile project(":samza-kafka_$scalaVersion")
+ compile project(":samza-kv-inmemory_$scalaVersion")
+ compile project(":samza-kv-rocksdb_$scalaVersion")
compile "org.apache.avro:avro:$avroVersion"
compile "org.apache.calcite:calcite-core:$calciteVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
- testCompile project(":samza-test_$scalaVersion")
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
@@ -753,6 +754,7 @@ project(":samza-test_$scalaVersion") {
compile project(":samza-kv-inmemory_$scalaVersion")
compile project(":samza-kv-rocksdb_$scalaVersion")
compile project(":samza-core_$scalaVersion")
+ compile project(":samza-sql")
runtime project(":samza-log4j")
runtime project(":samza-yarn_$scalaVersion")
runtime project(":samza-kafka_$scalaVersion")
@@ -769,6 +771,7 @@ project(":samza-test_$scalaVersion") {
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
+ testCompile project(":samza-sql").sourceSets.test.output
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 5c5ee84..9529581 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -153,6 +153,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
String executionPlanJson = plan.getPlanAsJson();
writePlanJsonFile(executionPlanJson);
+ LOG.info("Execution Plan: \n" + executionPlanJson);
// 2. create the necessary streams
// TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 31f9b92..db86969 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -50,6 +50,7 @@ object StreamConfig {
val IS_BOUNDED_FOR_STREAM_ID = STREAM_ID_PREFIX + IS_BOUNDED
val PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + PRIORITY
val CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT
+ val BOOTSTRAP_FOR_STREAM_ID = STREAM_ID_PREFIX + BOOTSTRAP
implicit def Config2Stream(config: Config) = new StreamConfig(config)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
new file mode 100644
index 0000000..f646d9a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java
@@ -0,0 +1,81 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A serializable class that holds different key parts.
+ */
+public class SamzaSqlCompositeKey implements Serializable {
+
+ @JsonProperty("keyParts")
+ private ArrayList<Object> keyParts;
+ private int hashCode;
+
+ @JsonCreator
+ public SamzaSqlCompositeKey(@JsonProperty("keyParts") List<Object> keyParts) {
+ this.keyParts = new ArrayList<>(keyParts);
+ hashCode = keyParts.hashCode();
+ }
+
+ /**
+ * Get the keyParts of all the columns in the relational message.
+ * @return the keyParts of all the columns
+ */
+ @JsonProperty("keyParts")
+ public ArrayList<Object> getKeyParts() {
+ return keyParts;
+ }
+
+ @Override
+ public String toString() {
+ return String.join(", ", Arrays.toString(keyParts.toArray()));
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o || o != null && getClass() == o.getClass() && keyParts.equals(((SamzaSqlCompositeKey) o).keyParts);
+ }
+
+ /**
+ * Create the SamzaSqlCompositeKey from the rel message.
+ * @param message Represents the samza sql rel message.
+ * @param relIdx list of keys in the form of field indices within the rel message.
+ */
+ public static SamzaSqlCompositeKey createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) {
+ ArrayList<Object> keyParts = new ArrayList<>();
+ for (int idx : relIdx) {
+ keyParts.add(message.getFieldValues().get(idx));
+ }
+ return new SamzaSqlCompositeKey(keyParts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
index 88bcb61..b0c30dd 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
@@ -58,4 +58,8 @@ public class SamzaSqlExecutionContext {
scalarUdf.init(udfConfig);
return scalarUdf;
}
+
+ public SamzaSqlApplicationConfig getSamzaSqlApplicationConfig() {
+ return sqlConfig;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 452a32c..b54634f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -19,10 +19,12 @@
package org.apache.samza.sql.data;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang.Validate;
+import org.codehaus.jackson.annotate.JsonProperty;
/**
@@ -31,15 +33,19 @@ import org.apache.commons.lang.Validate;
* their associated column names. Right now we donot store any other metadata other than the column name in the
* SamzaSqlRelationalMessage, In future if we find a need, we could add additional column ddl metadata around
* primary Key, nullability, etc.
+ * TODO: SAMZA-1619 Support serialization of nested SamzaSqlRelMessage.
*/
-public class SamzaSqlRelMessage {
+public class SamzaSqlRelMessage implements Serializable {
public static final String KEY_NAME = "__key__";
- private final List<Object> fieldValues = new ArrayList<>();
- private final List<String> fieldNames = new ArrayList<>();
private final Object key;
+ @JsonProperty("fieldNames")
+ private final List<String> fieldNames;
+ @JsonProperty("fieldValues")
+ private final List<Object> fieldValues;
+
/**
* Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
* If the field list contains KEY, then it extracts the key out of the fields to creates a
@@ -49,16 +55,20 @@ public class SamzaSqlRelMessage {
* delete change capture event in the stream or because of the result of the outer join or the fields
* themselves are null in the original stream.
*/
- public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) {
+ public SamzaSqlRelMessage(@JsonProperty("fieldNames") List<String> fieldNames,
+ @JsonProperty("fieldValues") List<Object> fieldValues) {
Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
+ this.fieldNames = new ArrayList<>();
+ this.fieldValues = new ArrayList<>();
+
int keyIndex = fieldNames.indexOf(KEY_NAME);
Object key = null;
if (keyIndex != -1) {
key = fieldValues.get(keyIndex);
}
-
this.key = key;
+
this.fieldNames.addAll(fieldNames);
this.fieldValues.addAll(fieldValues);
}
@@ -74,10 +84,15 @@ public class SamzaSqlRelMessage {
*/
public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues) {
Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
+
+ this.fieldNames = new ArrayList<>();
+ this.fieldValues = new ArrayList<>();
+
this.key = key;
this.fieldNames.add(KEY_NAME);
- this.fieldNames.addAll(fieldNames);
this.fieldValues.add(key);
+
+ this.fieldNames.addAll(fieldNames);
this.fieldValues.addAll(fieldValues);
}
@@ -85,10 +100,12 @@ public class SamzaSqlRelMessage {
* Get the field names of all the columns in the relational message.
* @return the field names of all columns.
*/
+ @JsonProperty("fieldNames")
public List<String> getFieldNames() {
return fieldNames;
}
+ @JsonProperty("fieldValues")
public List<Object> getFieldValues() {
return this.fieldValues;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
index a2d8b0c..5348d3d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
@@ -23,14 +23,16 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.sql.interfaces.SourceResolver;
import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Source Resolver implementation that uses static config to return a config corresponding to a system stream.
- * This Source resolver implementation supports sources of type {systemName}.{streamName}
+ * This Source resolver implementation supports sources of type {systemName}.{streamName}[.$table]
+ * {systemName}.{streamName} indicates a stream
+ * {systemName}.{streamName}.$table indicates a table
*/
public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
@@ -44,6 +46,7 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
}
private class ConfigBasedSourceResolver implements SourceResolver {
+ private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
private final Config config;
public ConfigBasedSourceResolver(Config config) {
@@ -51,19 +54,52 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
}
@Override
- public SqlSystemStreamConfig fetchSourceInfo(String source) {
+ public SqlSystemSourceConfig fetchSourceInfo(String source) {
String[] sourceComponents = source.split("\\.");
+ boolean isTable = false;
+
+ // This source resolver expects sources of format {systemName}.{streamName}[.$table]
+ // * First source part is always system name.
+ // * The last source part could be either a "$table" keyword or stream name. If it is "$table", then stream name
+ // should be the one before the last source part.
+ int endIdx = sourceComponents.length - 1;
+ int streamIdx = endIdx;
+ boolean invalidQuery = false;
- // This source resolver expects sources of format {systemName}.{streamName}
if (sourceComponents.length != 2) {
- String msg = String.format("Source %s is not of the format {systemName}.{streamName{", source);
+ if (sourceComponents.length != 3 ||
+ !sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+ invalidQuery = true;
+ }
+ } else {
+ if (sourceComponents[0].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD) ||
+ sourceComponents[1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+ invalidQuery = true;
+ }
+ }
+
+ if (invalidQuery) {
+ String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source,
+ SAMZA_SQL_QUERY_TABLE_KEYWORD);
LOG.error(msg);
throw new SamzaException(msg);
}
+
+ if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
+ isTable = true;
+ streamIdx = endIdx - 1;
+ }
+
String systemName = sourceComponents[0];
- String streamName = sourceComponents[1];
+ String streamName = sourceComponents[streamIdx];
+
+ return new SqlSystemSourceConfig(systemName, streamName, fetchSystemConfigs(systemName), isTable);
+ }
- return new SqlSystemStreamConfig(systemName, streamName, fetchSystemConfigs(systemName));
+ @Override
+ public boolean isTable(String sourceName) {
+ String[] sourceComponents = sourceName.split("\\.");
+ return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD);
}
private Config fetchSystemConfigs(String systemName) {
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
index ac3fd31..c161a0d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
@@ -20,7 +20,7 @@
package org.apache.samza.sql.interfaces;
/**
- * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemStreamConfig} corresponding to the source.
+ * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemSourceConfig} corresponding to the source.
*/
public interface SourceResolver {
/**
@@ -30,5 +30,15 @@ public interface SourceResolver {
* @return
* System stream config corresponding to the source.
*/
- SqlSystemStreamConfig fetchSourceInfo(String sourceName);
+ SqlSystemSourceConfig fetchSourceInfo(String sourceName);
+
+ /**
+ * Returns if a given source is a table. Different source resolvers could have different notations in the source
+ * name for denoting a table. Eg: system.stream.$table
+ * @param sourceName
+ * source that needs to be checked if it is a table.
+ * @return
+ * true if the source is a table, else false.
+ */
+ boolean isTable(String sourceName);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
new file mode 100644
index 0000000..02ec18a
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemSourceConfig.java
@@ -0,0 +1,129 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Configs associated with a system source. Both streams and table sources are supported.
+ * For now, only local tables are supported.
+ */
+public class SqlSystemSourceConfig {
+
+ public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+ public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
+
+ private final String systemName;
+
+ private final String streamName;
+
+ private final String samzaRelConverterName;
+ private final SystemStream systemStream;
+
+ private final String source;
+ private String relSchemaProviderName;
+
+ private Config config;
+
+ private List<String> sourceParts;
+
+ public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig) {
+ this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, false);
+ }
+
+ public SqlSystemSourceConfig(String systemName, String streamName, Config systemConfig, boolean isTable) {
+ this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig, isTable);
+ }
+
+ public SqlSystemSourceConfig(String systemName, String streamName, List<String> sourceParts,
+ Config systemConfig, boolean isTable) {
+
+
+ HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
+ this.systemName = systemName;
+ this.streamName = streamName;
+ this.source = getSourceFromSourceParts(sourceParts);
+ this.sourceParts = sourceParts;
+ this.systemStream = new SystemStream(systemName, streamName);
+
+ samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
+ Validate.notEmpty(samzaRelConverterName,
+ String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+
+ relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
+
+ // Removing the Samza SQL specific configs to get the remaining Samza configs.
+ streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
+ streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
+
+ // Currently, only local table is supported. And it is assumed that all tables are local tables.
+ if (isTable) {
+ streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true");
+ streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest");
+ }
+
+ config = new MapConfig(streamConfigs);
+ }
+
+ public static String getSourceFromSourceParts(List<String> sourceParts) {
+ return Joiner.on(".").join(sourceParts);
+ }
+
+ public List<String> getSourceParts() {
+ return sourceParts;
+ }
+
+ public String getSystemName() {
+ return systemName;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getSamzaRelConverterName() {
+ return samzaRelConverterName;
+ }
+
+ public String getRelSchemaProviderName() {
+ return relSchemaProviderName;
+ }
+
+ public SystemStream getSystemStream() {
+ return systemStream;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public String getSource() {
+ return source;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
deleted file mode 100644
index d8965a4..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.interfaces;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * Configs associated with a system stream.
- */
-public class SqlSystemStreamConfig {
-
- public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
- public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
-
- private final String systemName;
-
- private final String streamName;
-
- private final String samzaRelConverterName;
- private final SystemStream systemStream;
-
- private final String source;
- private String relSchemaProviderName;
-
- private Config config;
-
- private List<String> sourceParts;
-
- public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) {
- this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig);
- }
-
- public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts,
- Config systemConfig) {
-
-
- HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
- this.systemName = systemName;
- this.streamName = streamName;
- this.source = getSourceFromSourceParts(sourceParts);
- this.sourceParts = sourceParts;
- this.systemStream = new SystemStream(systemName, streamName);
-
- samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
- Validate.notEmpty(samzaRelConverterName,
- String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
-
- relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
-
- // Removing the Samza SQL specific configs to get the remaining Samza configs.
- streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
- streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
-
- config = new MapConfig(streamConfigs);
- }
-
- public static String getSourceFromSourceParts(List<String> sourceParts) {
- return Joiner.on(".").join(sourceParts);
- }
-
- public List<String> getSourceParts() {
- return sourceParts;
- }
-
- public String getSystemName() {
- return systemName;
- }
-
- public String getStreamName() {
- return streamName;
- }
-
- public String getSamzaRelConverterName() {
- return samzaRelConverterName;
- }
-
- public String getRelSchemaProviderName() {
- return relSchemaProviderName;
- }
-
- public SystemStream getSystemStream() {
- return systemStream;
- }
-
- public Config getConfig() {
- return config;
- }
-
- public String getSource() {
- return source;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 2b67f18..f21eccf 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -55,7 +55,7 @@ import org.apache.calcite.tools.Planner;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
import org.apache.samza.sql.interfaces.UdfMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,11 +72,11 @@ public class QueryPlanner {
// Mapping between the source to the RelSchemaProvider corresponding to the source.
private final Map<String, RelSchemaProvider> relSchemaProviders;
- // Mapping between the source to the SqlSystemStreamConfig corresponding to the source.
- private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
+ // Mapping between the source to the SqlSystemSourceConfig corresponding to the source.
+ private final Map<String, SqlSystemSourceConfig> systemStreamConfigBySource;
public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
- Map<String, SqlSystemStreamConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
+ Map<String, SqlSystemSourceConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
this.relSchemaProviders = relSchemaProviders;
this.systemStreamConfigBySource = systemStreamConfigBySource;
this.udfMetadata = udfMetadata;
@@ -88,7 +88,7 @@ public class QueryPlanner {
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
- for (SqlSystemStreamConfig ssc : systemStreamConfigBySource.values()) {
+ for (SqlSystemSourceConfig ssc : systemStreamConfigBySource.values()) {
SchemaPlus previousLevelSchema = rootSchema;
List<String> sourceParts = ssc.getSourceParts();
RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());
@@ -96,7 +96,7 @@ public class QueryPlanner {
for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
String sourcePart = sourceParts.get(sourcePartIndex);
if (sourcePartIndex < sourceParts.size() - 1) {
- SchemaPlus sourcePartSchema = rootSchema.getSubSchema(sourcePart);
+ SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
if (sourcePartSchema == null) {
sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 227a0f1..aeb7f35 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -42,7 +42,7 @@ import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
import org.apache.samza.sql.interfaces.SourceResolver;
import org.apache.samza.sql.interfaces.SourceResolverFactory;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
import org.apache.samza.sql.interfaces.UdfMetadata;
import org.apache.samza.sql.interfaces.UdfResolver;
import org.apache.samza.sql.testutil.JsonUtil;
@@ -50,7 +50,6 @@ import org.apache.samza.sql.testutil.ReflectionUtils;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser.QueryInfo;
import org.apache.samza.sql.testutil.SqlFileParser;
-import org.apache.samza.system.SystemStream;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,8 +85,8 @@ public class SamzaSqlApplicationConfig {
private final Collection<UdfMetadata> udfMetadata;
- private final Map<String, SqlSystemStreamConfig> inputSystemStreamConfigBySource;
- private final Map<String, SqlSystemStreamConfig> outputSystemStreamConfigsBySource;
+ private final Map<String, SqlSystemSourceConfig> inputSystemStreamConfigBySource;
+ private final Map<String, SqlSystemSourceConfig> outputSystemStreamConfigsBySource;
private final List<String> sql;
@@ -109,7 +108,7 @@ public class SamzaSqlApplicationConfig {
.flatMap(Collection::stream)
.collect(Collectors.toMap(Function.identity(), sourceResolver::fetchSourceInfo));
- Set<SqlSystemStreamConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
+ Set<SqlSystemSourceConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
outputSystemStreamConfigsBySource = queryInfo.stream()
.map(QueryInfo::getOutputSource)
@@ -117,13 +116,13 @@ public class SamzaSqlApplicationConfig {
systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
relSchemaProvidersBySource = systemStreamConfigs.stream()
- .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
+ .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig,
CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
(o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c))));
samzaRelConvertersBySource = systemStreamConfigs.stream()
- .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
+ .collect(Collectors.toMap(SqlSystemSourceConfig::getSource,
x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
relSchemaProvidersBySource.get(x.getSource()), c))));
@@ -226,11 +225,11 @@ public class SamzaSqlApplicationConfig {
return udfMetadata;
}
- public Map<String, SqlSystemStreamConfig> getInputSystemStreamConfigBySource() {
+ public Map<String, SqlSystemSourceConfig> getInputSystemStreamConfigBySource() {
return inputSystemStreamConfigBySource;
}
- public Map<String, SqlSystemStreamConfig> getOutputSystemStreamConfigsBySource() {
+ public Map<String, SqlSystemSourceConfig> getOutputSystemStreamConfigsBySource() {
return outputSystemStreamConfigsBySource;
}
@@ -241,4 +240,8 @@ public class SamzaSqlApplicationConfig {
public Map<String, RelSchemaProvider> getRelSchemaProviders() {
return relSchemaProvidersBySource;
}
+
+ public SourceResolver getSourceResolver() {
+ return sourceResolver;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 83928e1..f54ca42 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -32,7 +32,7 @@ import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
import org.apache.samza.sql.interfaces.SourceResolver;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,13 +82,13 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
for (SamzaSqlQueryParser.QueryInfo query : queryInfo) {
// Populate stream to system mapping config for input and output system streams
for (String inputSource : query.getInputSources()) {
- SqlSystemStreamConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
+ SqlSystemSourceConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
inputSystemStreamConfig.getSystemName());
newConfig.putAll(inputSystemStreamConfig.getConfig());
}
- SqlSystemStreamConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
+ SqlSystemSourceConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
outputSystemStreamConfig.getSystemName());
newConfig.putAll(outputSystemStreamConfig.getConfig());
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
index dd5f3bc..faf903a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/testutil/SamzaSqlQueryParser.java
@@ -23,7 +23,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -102,14 +101,14 @@ public class SamzaSqlQueryParser {
String outputSource;
String selectQuery;
- String inputSource;
+ ArrayList<String> inputSources;
if (sqlNode instanceof SqlInsert) {
SqlInsert sqlInsert = ((SqlInsert) sqlNode);
outputSource = sqlInsert.getTargetTable().toString();
if (sqlInsert.getSource() instanceof SqlSelect) {
SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
selectQuery = m.group(2);
- inputSource = getInputFromSelectQuery(sqlSelect);
+ inputSources = getInputsFromSelectQuery(sqlSelect);
} else {
throw new SamzaException("Sql query is not of the expected format");
}
@@ -117,7 +116,7 @@ public class SamzaSqlQueryParser {
throw new SamzaException("Sql query is not of the expected format");
}
- return new QueryInfo(selectQuery, Collections.singletonList(inputSource), outputSource);
+ return new QueryInfo(selectQuery, inputSources, outputSource);
}
private static Planner createPlanner() {
@@ -147,17 +146,17 @@ public class SamzaSqlQueryParser {
return Frameworks.getPlanner(frameworkConfig);
}
- private static String getInputFromSelectQuery(SqlSelect sqlSelect) {
+ private static ArrayList<String> getInputsFromSelectQuery(SqlSelect sqlSelect) {
ArrayList<String> input = new ArrayList<>();
getInput(sqlSelect.getFrom(), input);
- if (input.size() != 1) {
+ if (input.size() < 1) {
throw new SamzaException("Unsupported query " + sqlSelect);
}
- return input.get(0);
+ return input;
}
- private static void getInput(SqlNode node, ArrayList<String> inputSource) {
+ private static void getInput(SqlNode node, ArrayList<String> inputSourceList) {
if (node instanceof SqlJoin) {
SqlJoin joinNode = (SqlJoin) node;
ArrayList<String> inputsLeft = new ArrayList<>();
@@ -165,24 +164,20 @@ public class SamzaSqlQueryParser {
getInput(joinNode.getLeft(), inputsLeft);
getInput(joinNode.getRight(), inputsRight);
- if (!inputsLeft.isEmpty() && !inputsRight.isEmpty()) {
- throw new SamzaException("Joins on two entities are not supported yet");
- }
-
- inputSource.addAll(inputsLeft);
- inputSource.addAll(inputsRight);
+ inputSourceList.addAll(inputsLeft);
+ inputSourceList.addAll(inputsRight);
} else if (node instanceof SqlIdentifier) {
- inputSource.add(node.toString());
+ inputSourceList.add(node.toString());
} else if (node instanceof SqlBasicCall) {
SqlBasicCall basicCall = ((SqlBasicCall) node);
if (basicCall.getOperator() instanceof SqlAsOperator) {
- getInput(basicCall.operand(0), inputSource);
+ getInput(basicCall.operand(0), inputSourceList);
} else if (basicCall.getOperator() instanceof SqlUnnestOperator && basicCall.operand(0) instanceof SqlSelect) {
- inputSource.add(getInputFromSelectQuery(basicCall.operand(0)));
+ inputSourceList.addAll(getInputsFromSelectQuery(basicCall.operand(0)));
return;
}
} else if (node instanceof SqlSelect) {
- getInput(((SqlSelect) node).getFrom(), inputSource);
+ getInput(((SqlSelect) node).getFrom(), inputSourceList);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 686ac15..798f0b3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -22,7 +22,10 @@ package org.apache.samza.sql.translator;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexNode;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.sql.data.Expression;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
@@ -34,16 +37,23 @@ import org.slf4j.LoggerFactory;
* Translator to translate the LogicalFilter node in the relational graph to the corresponding StreamGraph
* implementation
*/
-public class FilterTranslator {
+class FilterTranslator {
private static final Logger log = LoggerFactory.getLogger(FilterTranslator.class);
- public void translate(final LogicalFilter filter, final TranslatorContext context) {
+ void translate(final LogicalFilter filter, final TranslatorContext context) {
MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(filter.getInput().getId());
+ MessageStream<SamzaSqlRelMessage> outputStream = translateFilter(inputStream, filter.getInputs(),
+ filter.getCondition(), context);
+ context.registerMessageStream(filter.getId(), outputStream);
+ }
+
+ static MessageStream<SamzaSqlRelMessage> translateFilter(MessageStream<SamzaSqlRelMessage> inputStream,
+ List<RelNode> inputs, RexNode condition, final TranslatorContext context) {
Expression expr =
- context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
+ context.getExpressionCompiler().compile(inputs, Collections.singletonList(condition));
- MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(message -> {
+ return inputStream.filter(message -> {
Object[] result = new Object[1];
expr.execute(context.getExecutionContext(), context.getDataContext(), message.getFieldValues().toArray(), result);
if (result.length > 0 && result[0] instanceof Boolean) {
@@ -56,7 +66,5 @@ public class FilterTranslator {
return false;
}
});
-
- context.registerMessageStream(filter.getId(), outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
new file mode 100644
index 0000000..70c1968
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -0,0 +1,279 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.table.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*;
+
+
+/**
+ * Translator to translate the LogicalJoin node in the relational graph to the corresponding StreamGraph
+ * implementation.
+ * Join is supported with the following caveats:
+ * 0. Only local tables are supported. Remote/composite tables are not yet supported.
+ * 1. Only stream-table joins are supported. No stream-stream joins.
+ * 2. Only Equi-joins are supported. No theta-joins.
+ * 3. Inner joins, Left and Right outer joins are supported. No cross joins, full outer joins or natural joins.
+ * 4. Join condition with a constant is not supported.
+ * 5. Compound join condition with only AND operator is supported. AND operator with a constant is not supported. No
+ * support for OR operator or any other operator in the join condition.
+ * 6. Join condition with UDFs is not supported. Eg: udf1(a.key) = udf2(b.key) is not supported.
+ *
+ * It is assumed that the stream denoted as 'table' is already partitioned by the key(s) specified in the join
+ * condition. We do not repartition the table as bootstrap semantic is not propagated to the intermediate streams.
+ * Please refer SAMZA-1613 for more details on this. But we always repartition the stream by the key(s) specified in
+ * the join condition.
+ */
+class JoinTranslator {
+
+ private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
+ private int joinId;
+ private SourceResolver sourceResolver;
+
+ JoinTranslator(int joinId, SourceResolver sourceResolver) {
+ this.joinId = joinId;
+ this.sourceResolver = sourceResolver;
+ }
+
+ void translate(final LogicalJoin join, final TranslatorContext context) {
+
+ // Do the validation of join query
+ validateJoinQuery(join);
+
+ boolean isTablePosOnRight = isTable(join.getRight());
+ List<Integer> streamKeyIds = new LinkedList<>();
+ List<Integer> tableKeyIds = new LinkedList<>();
+
+ // Fetch the stream and table indices corresponding to the fields given in the join condition.
+ populateStreamAndTableKeyIds(((RexCall) join.getCondition()).getOperands(), join, isTablePosOnRight, streamKeyIds,
+ tableKeyIds);
+
+ JsonSerdeV2<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+ JsonSerdeV2<SamzaSqlRelMessage> relMsgSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class);
+
+ Table table = loadLocalTable(isTablePosOnRight, tableKeyIds, keySerde, relMsgSerde, join, context);
+
+ MessageStream<SamzaSqlRelMessage> inputStream =
+ isTablePosOnRight ?
+ context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId());
+
+ List<String> streamFieldNames = (isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames();
+ List<String> tableFieldNames = (isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames();
+ Validate.isTrue(streamKeyIds.size() == tableKeyIds.size());
+ log.info("Joining on the following Stream and Table field(s): ");
+ for (int i = 0; i < streamKeyIds.size(); i++) {
+ log.info(streamFieldNames.get(streamKeyIds.get(i)) + " with " + tableFieldNames.get(tableKeyIds.get(i)));
+ }
+
+ SamzaSqlRelMessageJoinFunction joinFn =
+ new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames,
+ tableFieldNames);
+
+ // Always re-partition the messages from the input stream by the composite key and then join the messages
+ // with the table.
+ MessageStream<SamzaSqlRelMessage> outputStream =
+ inputStream
+ .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
+ m -> m,
+ KVSerde.of(keySerde, relMsgSerde),
+ "stream_" + joinId)
+ .map(KV::getValue)
+ .join(table, joinFn);
+
+ context.registerMessageStream(join.getId(), outputStream);
+ }
+
+ private void validateJoinQuery(LogicalJoin join) {
+ JoinRelType joinRelType = join.getJoinType();
+
+ if (joinRelType.compareTo(JoinRelType.INNER) != 0 && joinRelType.compareTo(JoinRelType.LEFT) != 0
+ && joinRelType.compareTo(JoinRelType.RIGHT) != 0) {
+ throw new SamzaException("Query with only INNER and LEFT/RIGHT OUTER join are supported.");
+ }
+
+ boolean isTablePosOnLeft = isTable(join.getLeft());
+ boolean isTablePosOnRight = isTable(join.getRight());
+
+ if (!isTablePosOnLeft && !isTablePosOnRight) {
+ throw new SamzaException("Invalid query with both sides of join being denoted as 'stream'. "
+ + "Stream-stream join is not yet supported. " + dumpRelPlanForNode(join));
+ }
+
+ if (isTablePosOnLeft && isTablePosOnRight) {
+ throw new SamzaException("Invalid query with both sides of join being denoted as 'table'. " +
+ dumpRelPlanForNode(join));
+ }
+
+ if (joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnLeft && !isTablePosOnRight) {
+ throw new SamzaException("Invalid query for outer left join. Left side of the join should be a 'stream' and "
+ + "right side of join should be a 'table'. " + dumpRelPlanForNode(join));
+ }
+
+ if (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && isTablePosOnRight && !isTablePosOnLeft) {
+ throw new SamzaException("Invalid query for outer right join. Left side of the join should be a 'table' and "
+ + "right side of join should be a 'stream'. " + dumpRelPlanForNode(join));
+ }
+
+ validateJoinCondition(join.getCondition());
+ }
+
+ private void validateJoinCondition(RexNode operand) {
+ if (!(operand instanceof RexCall)) {
+ throw new SamzaException("SQL Query is not supported. Join condition operand " + operand +
+ " is of type " + operand.getClass());
+ }
+
+ RexCall condition = (RexCall) operand;
+
+ if (condition.isAlwaysTrue()) {
+ throw new SamzaException("Query results in a cross join, which is not supported. Please optimize the query."
+ + " It is expected that the joins should include JOIN ON operator in the sql query.");
+ }
+
+ if (condition.getKind() != SqlKind.EQUALS && condition.getKind() != SqlKind.AND) {
+ throw new SamzaException("Only equi-joins and AND operator is supported in join condition.");
+ }
+ }
+
+ // Fetch the stream and table key indices corresponding to the fields given in the join condition by parsing through
+ // the condition. Stream and table key indices are populated in streamKeyIds and tableKeyIds respectively.
+ private void populateStreamAndTableKeyIds(List<RexNode> operands, final LogicalJoin join, boolean isTablePosOnRight,
+ List<Integer> streamKeyIds, List<Integer> tableKeyIds) {
+
+ // All non-leaf operands in the join condition should be expressions.
+ if (operands.get(0) instanceof RexCall) {
+ operands.forEach(operand -> {
+ validateJoinCondition(operand);
+ populateStreamAndTableKeyIds(((RexCall) operand).getOperands(), join, isTablePosOnRight, streamKeyIds, tableKeyIds);
+ });
+ return;
+ }
+
+ // We are at the leaf of the join condition. Only binary operators are supported.
+ Validate.isTrue(operands.size() == 2);
+
+ // Only reference operands are supported in row expressions and not constants.
+ // a.key = b.key is supported with a.key and b.key being reference operands.
+ // a.key = "constant" is not yet supported.
+ if (!(operands.get(0) instanceof RexInputRef) || !(operands.get(1) instanceof RexInputRef)) {
+ throw new SamzaException("SQL query is not supported. Join condition " + join.getCondition() + " should have "
+ + "reference operands but the types are " + operands.get(0).getClass() + " and " + operands.get(1).getClass());
+ }
+
+ // Join condition is commutative, meaning, a.key = b.key is equivalent to b.key = a.key.
+ // Calcite assigns the indices to the fields based on the order a and b are specified in
+ // the sql 'from' clause. Let's put the operand with smaller index in leftRef and larger
+ // index in rightRef so that the order of operands in the join condition is in the order
+ // the stream and table are specified in the 'from' clause.
+ RexInputRef leftRef = (RexInputRef) operands.get(0);
+ RexInputRef rightRef = (RexInputRef) operands.get(1);
+
+ // Let's validate the key used in the join condition.
+ validateKey(leftRef);
+ validateKey(rightRef);
+
+ if (leftRef.getIndex() > rightRef.getIndex()) {
+ RexInputRef tmpRef = leftRef;
+ leftRef = rightRef;
+ rightRef = tmpRef;
+ }
+
+ // Get the table key index and stream key index
+ int deltaKeyIdx = rightRef.getIndex() - join.getLeft().getRowType().getFieldCount();
+ streamKeyIds.add(isTablePosOnRight ? leftRef.getIndex() : deltaKeyIdx);
+ tableKeyIds.add(isTablePosOnRight ? deltaKeyIdx : leftRef.getIndex());
+ }
+
+ private void validateKey(RexInputRef ref) {
+ SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
+ // Only primitive types are supported in the key
+ if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
+ && sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
+ && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT) {
+ log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
+ throw new SamzaException("Unsupported key type used in join condition.");
+ }
+ }
+
+ private String dumpRelPlanForNode(RelNode relNode) {
+ return RelOptUtil.dumpPlan("Rel expression: ",
+ relNode, SqlExplainFormat.TEXT,
+ SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ }
+
+ private boolean isTable(RelNode relNode) {
+ // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
+ // stream-table-table join, the left side of the join is join output, which we always
+ // assume to be a stream. The intermediate stream won't be an instance of EnumerableTableScan.
+ return relNode instanceof EnumerableTableScan &&
+ sourceResolver.isTable(String.join(".", relNode.getTable().getQualifiedName()));
+ }
+
+ private Table loadLocalTable(boolean isTablePosOnRight, List<Integer> tableKeyIds, Serde keySerde, Serde relMsgSerde,
+ LogicalJoin join, TranslatorContext context) {
+ MessageStream<SamzaSqlRelMessage> inputTable =
+ isTablePosOnRight ?
+ context.getMessageStream(join.getRight().getId()) : context.getMessageStream(join.getLeft().getId());
+
+ // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational
+ // message as the value. Send the messages from the input stream denoted as 'table' to the created table store.
+ Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
+ context.getStreamGraph()
+ .getTable(new RocksDbTableDescriptor("table_" + joinId)
+ .withSerde(KVSerde.of(keySerde, relMsgSerde)));
+
+ inputTable
+ .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
+ .sendTo(table);
+
+ return table;
+ }
+
+ private void logStringAndTableJoinKeys(List<String> fieldNames, List<Integer> fieldIds) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index f5cc525..0f31fb6 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -42,11 +42,11 @@ import org.slf4j.LoggerFactory;
* Translator to translate the Project node in the relational graph to the corresponding StreamGraph
* implementation.
*/
-public class ProjectTranslator {
+class ProjectTranslator {
private static final Logger LOG = LoggerFactory.getLogger(ProjectTranslator.class);
- public void translate(final Project project, final TranslatorContext context) {
+ void translate(final Project project, final TranslatorContext context) {
MessageStream<SamzaSqlRelMessage> messageStream = context.getMessageStream(project.getInput().getId());
List<Integer> flattenProjects =
project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 87e37f4..b853537 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
@@ -32,7 +33,8 @@ import org.apache.samza.operators.StreamGraph;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
@@ -62,6 +64,7 @@ public class QueryTranslator {
final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
final TranslatorContext context = new TranslatorContext(streamGraph, relRoot, executionContext);
final RelNode node = relRoot.project();
+ final int[] joinId = new int[1];
node.accept(new RelShuttleImpl() {
@Override
@@ -84,9 +87,18 @@ public class QueryTranslator {
new ProjectTranslator().translate(project, context);
return node;
}
+
+ @Override
+ public RelNode visit(LogicalJoin join) {
+ RelNode node = super.visit(join);
+ joinId[0]++;
+ SourceResolver sourceResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getSourceResolver();
+ new JoinTranslator(joinId[0], sourceResolver).translate(join, context);
+ return node;
+ }
});
- SqlSystemStreamConfig outputSystemConfig =
+ SqlSystemSourceConfig outputSystemConfig =
sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource());
SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getOutputSource());
MessageStreamImpl<SamzaSqlRelMessage> stream =
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
new file mode 100644
index 0000000..69e4e09
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java
@@ -0,0 +1,121 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.translator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.sql.data.SamzaSqlCompositeKey;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*;
+
+
+/**
+ * This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key
+ * being {@link SamzaSqlCompositeKey}
+ */
+public class SamzaSqlRelMessageJoinFunction
+ implements StreamTableJoinFunction<SamzaSqlCompositeKey, SamzaSqlRelMessage, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>, SamzaSqlRelMessage> {
+
+ private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class);
+
+ private final JoinRelType joinRelType;
+ private final boolean isTablePosOnRight;
+ private final List<Integer> streamFieldIds;
+ // Table field names are used in the outer join when the table record is not found.
+ private final List<String> tableFieldNames;
+ private final List<String> outFieldNames;
+
+ public SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight,
+ List<Integer> streamFieldIds, List<String> streamFieldNames, List<String> tableFieldNames) {
+ this.joinRelType = joinRelType;
+ this.isTablePosOnRight = isTablePosOnRight;
+ Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) ||
+ (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) ||
+ joinRelType.compareTo(JoinRelType.INNER) == 0);
+ this.streamFieldIds = streamFieldIds;
+ this.tableFieldNames = tableFieldNames;
+ this.outFieldNames = new ArrayList<>();
+ if (isTablePosOnRight) {
+ outFieldNames.addAll(streamFieldNames);
+ }
+ outFieldNames.addAll(tableFieldNames);
+ if (!isTablePosOnRight) {
+ outFieldNames.addAll(streamFieldNames);
+ }
+ }
+
+ @Override
+ public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+
+ if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) {
+ log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message));
+ // Returning null would result in Join operator implementation to filter out the message.
+ return null;
+ }
+
+ // The resulting join output should be a SamzaSqlRelMessage containing the fields from both the stream message and
+ // table record. The order of stream message fields and table record fields are dictated by the position of stream
+ // and table in the 'from' clause of sql query. The output should also include the keys from both the stream message
+ // and the table record.
+ List<Object> outFieldValues = new ArrayList<>();
+
+ // If table position is on the right, add the stream message fields first
+ if (isTablePosOnRight) {
+ outFieldValues.addAll(message.getFieldValues());
+ }
+
+ // Add the table record fields.
+ if (record != null) {
+ outFieldValues.addAll(record.getValue().getFieldValues());
+ } else {
+ // Table record could be null as the record could not be found in the store. This can
+ // happen for outer joins. Add nulls to all the field values in the output message.
+ tableFieldNames.forEach(s -> outFieldValues.add(null));
+ }
+
+ // If table position is on the left, add the stream message fields last
+ if (!isTablePosOnRight) {
+ outFieldValues.addAll(message.getFieldValues());
+ }
+
+ return new SamzaSqlRelMessage(outFieldNames, outFieldValues);
+ }
+
+ @Override
+ public SamzaSqlCompositeKey getMessageKey(SamzaSqlRelMessage message) {
+ return createSamzaSqlCompositeKey(message, streamFieldIds);
+ }
+
+ @Override
+ public SamzaSqlCompositeKey getRecordKey(KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) {
+ return record.getKey();
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 30e5a9b..13300f7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -28,27 +28,27 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.apache.samza.sql.interfaces.SqlSystemSourceConfig;
/**
* Translator to translate the TableScans in relational graph to the corresponding input streams in the StreamGraph
* implementation
*/
-public class ScanTranslator {
+class ScanTranslator {
private final Map<String, SamzaRelConverter> relMsgConverters;
- private final Map<String, SqlSystemStreamConfig> systemStreamConfig;
+ private final Map<String, SqlSystemSourceConfig> systemStreamConfig;
- public ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemStreamConfig> ssc) {
+ ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemSourceConfig> ssc) {
relMsgConverters = converters;
this.systemStreamConfig = ssc;
}
- public void translate(final TableScan tableScan, final TranslatorContext context) {
+ void translate(final TableScan tableScan, final TranslatorContext context) {
StreamGraph streamGraph = context.getStreamGraph();
List<String> tableNameParts = tableScan.getTable().getQualifiedName();
- String sourceName = SqlSystemStreamConfig.getSourceFromSourceParts(tableNameParts);
+ String sourceName = SqlSystemSourceConfig.getSourceFromSourceParts(tableNameParts);
Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName));
SamzaRelConverter converter = relMsgConverters.get(sourceName);