You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/16 19:15:02 UTC
[3/3] samza git commit: Samza SQL implementation for basic projects,
filtering and UDFs
Samza SQL implementation for basic projects, filtering and UDFs
## Samza SQL implementation for basic projects, filtering and
## Design document:
https://docs.google.com/document/d/1bE-ZuPfTpntm1hT3GwQEShYDiTqU3IkxeP4-3ZcGHgU/edit?usp=sharing
Author: Srinivasulu Punuru <sp...@linkedin.com>
Reviewers: Yi Pan <ni...@gmail.com>
Closes #295 from srinipunuru/samza-sql.1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9fa8beed
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9fa8beed
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9fa8beed
Branch: refs/heads/master
Commit: 9fa8beed7ac37c33a0c01412b075a9c9a0b47441
Parents: 2e2e00e
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Thu Nov 16 11:14:32 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Nov 16 11:14:32 2017 -0800
----------------------------------------------------------------------
build.gradle | 33 ++-
gradle/dependency-versions.gradle | 2 +
.../org/apache/samza/sql/udfs/ScalarUdf.java | 48 ++++
.../apache/samza/sql/avro/AvroRelConverter.java | 183 ++++++++++++++
.../samza/sql/avro/AvroRelConverterFactory.java | 44 ++++
.../samza/sql/avro/AvroRelSchemaProvider.java | 28 +++
.../samza/sql/avro/AvroTypeFactoryImpl.java | 132 ++++++++++
...ConfigBasedAvroRelSchemaProviderFactory.java | 63 +++++
.../org/apache/samza/sql/data/Expression.java | 38 +++
.../samza/sql/data/RexToJavaCompiler.java | 224 +++++++++++++++++
.../sql/data/SamzaSqlExecutionContext.java | 61 +++++
.../samza/sql/data/SamzaSqlRelMessage.java | 123 ++++++++++
.../org/apache/samza/sql/fn/FlattenUdf.java | 36 +++
.../impl/ConfigBasedSourceResolverFactory.java | 71 ++++++
.../samza/sql/impl/ConfigBasedUdfResolver.java | 97 ++++++++
.../samza/sql/interfaces/RelSchemaProvider.java | 36 +++
.../interfaces/RelSchemaProviderFactory.java | 33 +++
.../samza/sql/interfaces/SamzaRelConverter.java | 46 ++++
.../interfaces/SamzaRelConverterFactory.java | 39 +++
.../samza/sql/interfaces/SourceResolver.java | 34 +++
.../sql/interfaces/SourceResolverFactory.java | 36 +++
.../sql/interfaces/SqlSystemStreamConfig.java | 74 ++++++
.../samza/sql/interfaces/UdfMetadata.java | 61 +++++
.../samza/sql/interfaces/UdfResolver.java | 35 +++
.../org/apache/samza/sql/planner/Checker.java | 93 +++++++
.../apache/samza/sql/planner/QueryPlanner.java | 153 ++++++++++++
.../sql/planner/SamzaSqlOperatorTable.java | 101 ++++++++
.../sql/planner/SamzaSqlScalarFunctionImpl.java | 84 +++++++
.../sql/planner/SamzaSqlUdfOperatorTable.java | 62 +++++
.../samza/sql/runner/SamzaSqlApplication.java | 56 +++++
.../sql/runner/SamzaSqlApplicationConfig.java | 245 +++++++++++++++++++
.../sql/runner/SamzaSqlApplicationRunner.java | 133 ++++++++++
.../apache/samza/sql/testutil/ConfigUtil.java | 62 +++++
.../org/apache/samza/sql/testutil/JsonUtil.java | 91 +++++++
.../samza/sql/testutil/ReflectionUtils.java | 62 +++++
.../samza/sql/testutil/SamzaSqlQueryParser.java | 188 ++++++++++++++
.../samza/sql/testutil/SqlFileParser.java | 103 ++++++++
.../samza/sql/translator/FilterTranslator.java | 62 +++++
.../samza/sql/translator/ProjectTranslator.java | 108 ++++++++
.../samza/sql/translator/QueryTranslator.java | 96 ++++++++
.../samza/sql/translator/ScanTranslator.java | 70 ++++++
.../samza/sql/translator/TranslatorContext.java | 162 ++++++++++++
.../apache/samza/sql/TestQueryTranslator.java | 103 ++++++++
.../sql/TestSamzaSqlApplicationConfig.java | 92 +++++++
.../samza/sql/TestSamzaSqlFileParser.java | 58 +++++
.../samza/sql/TestSamzaSqlQueryParser.java | 70 ++++++
.../samza/sql/TestSamzaSqlRelMessage.java | 46 ++++
.../samza/sql/avro/TestAvroRelConversion.java | 239 ++++++++++++++++++
.../samza/sql/avro/schemas/ComplexRecord.avsc | 143 +++++++++++
.../samza/sql/avro/schemas/ComplexRecord.java | 92 +++++++
.../apache/samza/sql/avro/schemas/MyFixed.java | 29 +++
.../samza/sql/avro/schemas/SimpleRecord.avsc | 39 +++
.../samza/sql/avro/schemas/SimpleRecord.java | 52 ++++
.../samza/sql/avro/schemas/SubRecord.java | 53 ++++
.../samza/sql/avro/schemas/TestEnumType.java | 31 +++
.../samza/sql/e2e/TestSamzaSqlEndToEnd.java | 137 +++++++++++
.../sql/system/ConsoleLoggingSystemFactory.java | 83 +++++++
.../samza/sql/system/SimpleSystemAdmin.java | 61 +++++
.../samza/sql/system/TestAvroSystemFactory.java | 156 ++++++++++++
.../samza/sql/testutil/MyTestArrayUdf.java | 37 +++
.../apache/samza/sql/testutil/MyTestUdf.java | 45 ++++
.../samza/sql/testutil/SamzaSqlTestConfig.java | 103 ++++++++
samza-sql/src/test/resources/log4j.xml | 43 ++++
settings.gradle | 5 +-
64 files changed, 5218 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index da08ee9..59ff5f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -104,6 +104,12 @@ allprojects {
}
}
+idea {
+ project {
+ languageLevel = 1.8
+ }
+}
+
subprojects {
apply plugin: 'eclipse'
apply plugin: 'project-report'
@@ -185,7 +191,6 @@ project(":samza-core_$scalaVersion") {
}
}
-
project(':samza-azure') {
apply plugin: 'java'
apply plugin: 'checkstyle'
@@ -215,7 +220,6 @@ project(':samza-azure') {
}
}
-
project(":samza-autoscaling_$scalaVersion") {
apply plugin: 'scala'
apply plugin: 'checkstyle'
@@ -269,7 +273,24 @@ project(':samza-elasticsearch') {
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
- // Logging in tests is good.
+ testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
+ }
+}
+
+project(':samza-sql') {
+ apply plugin: 'java'
+
+ dependencies {
+ compile project(':samza-api')
+ compile project(":samza-kafka_$scalaVersion")
+ compile "org.apache.avro:avro:$avroVersion"
+ compile "org.apache.calcite:calcite-core:$calciteVersion"
+ compile "org.slf4j:slf4j-api:$slf4jVersion"
+
+ testCompile project(":samza-test_$scalaVersion")
+ testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
+
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
}
@@ -449,7 +470,7 @@ project(":samza-shell") {
}
// Usage: ./gradlew samza-shell:runJob \
- // -PconfigPath=file:///path/to/job/config.properties
+ // -PconfigPath=file:///path/to/job/config.properties
task runJob(type:JavaExec) {
description 'To run a job (defined in a properties file)'
main = 'org.apache.samza.job.JobRunner'
@@ -459,7 +480,7 @@ project(":samza-shell") {
}
// Usage: ./gradlew samza-shell:checkpointTool \
- // -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
+ // -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
task checkpointTool(type:JavaExec) {
description 'Command-line tool to inspect and manipulate the job’s checkpoint'
main = 'org.apache.samza.checkpoint.CheckpointTool'
@@ -470,7 +491,7 @@ project(":samza-shell") {
}
// Usage: ./gradlew samza-shell:kvPerformanceTest
- // -PconfigPath=file:///path/to/job/config.properties
+ // -PconfigPath=file:///path/to/job/config.properties
task kvPerformanceTest(type:JavaExec) {
description 'Command-line tool to run key-value performance tests'
main = 'org.apache.samza.test.performance.TestKeyValuePerformance'
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index a8af9a8..4f467ab 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -18,6 +18,8 @@
*/
ext {
apacheCommonsCollections4Version = "4.0"
+ avroVersion = "1.7.0"
+ calciteVersion = "1.14.0"
commonsCodecVersion = "1.9"
commonsCollectionVersion = "3.2.1"
commonsHttpClientVersion = "3.1"
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java b/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
new file mode 100644
index 0000000..719cace
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/sql/udfs/ScalarUdf.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.udfs;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * The base class for the Scalar UDFs. All the scalar UDF classes needs to extend this and implement a method named
+ * "execute". The number and type of arguments for the execute method in the UDF class should match the number and type of fields
+ * used while invoking this UDF in SQL statement.
+ * Say for e.g. User creates a UDF class with signature int execute(int var1, String var2). It can be used in a SQL query
+ * select myudf(id, name) from profile
+ * In the above query, Profile should contain fields named 'id' of INTEGER/NUMBER type and 'name' of type VARCHAR/CHARACTER
+ */
+public interface ScalarUdf {
+ /**
+ * Udfs can implement this method to perform any initialization that they may need.
+ * @param udfConfig Config specific to the udf.
+ */
+ void init(Config udfConfig);
+
+ /**
+ * Actual implementation of the udf function
+ * @param args
+ * list of all arguments that the udf needs
+ * @return
+ * Return value from the scalar udf.
+ */
+ Object execute(Object... args);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
new file mode 100644
index 0000000..1c17295
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -0,0 +1,183 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class converts a Samza Avro messages to Relational messages and vice versa.
+ * This supports Samza messages where Key is a string and Value is an avro record.
+ *
+ * Conversion from Samza to Relational Message :
+ * The key part of the samza message is represented as a special column {@link SamzaSqlRelMessage#KEY_NAME}
+ * in relational message.
+ *
+ * The value part of the samza message is expected to be {@link IndexedRecord}, All the fields in the IndexedRecord form
+ * the corresponding fields of the relational message.
+ *
+ * Conversion from Relational to Samza Message :
+ * This converts the Samza relational message into Avro {@link GenericRecord}.
+ * All the fields of the relational message is become fields of the Avro GenericRecord except of the field with name
+ * {@link SamzaSqlRelMessage#KEY_NAME}. This special field becomes the Key in the output Samza message.
+ */
+public class AvroRelConverter implements SamzaRelConverter {
+
+ protected final Config config;
+ private final Schema avroSchema;
+ private final RelDataType relationalSchema;
+
+ /**
+ * Class that converts the avro field to their corresponding relational fields
+ * Array fields are converted from Avro {@link org.apache.avro.generic.GenericData.Array} to {@link ArrayList}
+ */
+ public enum AvroToRelObjConverter {
+
+ /**
+ * If the relational field type is ArraySqlType, We expect the avro field to be of type either
+ * {@link GenericData.Array} or {@link List} which then is converted to Rel field of type {@link ArrayList}
+ */
+ ArraySqlType {
+ @Override
+ Object convert(Object avroObj) {
+ ArrayList<Object> retVal = new ArrayList<>();
+ if (avroObj != null) {
+ if (avroObj instanceof GenericData.Array) {
+ retVal.addAll(((GenericData.Array) avroObj));
+ } else if (avroObj instanceof List) {
+ retVal.addAll((List) avroObj);
+ }
+ }
+
+ return retVal;
+ }
+ },
+
+ /**
+ * If the relational field type is MapSqlType, We expect the avro field to be of type
+ * {@link Map}
+ */
+ MapSqlType {
+ @Override
+ Object convert(Object obj) {
+ Map<String, Object> retVal = new HashMap<>();
+ if (obj != null) {
+ retVal.putAll((Map<String, ?>) obj);
+ }
+ return retVal;
+ }
+ },
+
+ /**
+ * If the relational field type is RelRecordType, The field is considered an object
+ * and moved to rel field without any translation.
+ */
+ RelRecordType {
+ @Override
+ Object convert(Object obj) {
+ return obj;
+ }
+ },
+
+ /**
+ * If the relational field type is BasicSqlType, The field is moved to rel field without any translation.
+ */
+ BasicSqlType {
+ @Override
+ Object convert(Object obj) {
+ return obj;
+ }
+ };
+
+ abstract Object convert(Object obj);
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class);
+
+ private final Schema arraySchema = Schema.parse(
+ "{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]},\"java-class\":\"java.util.List\"}");
+ private final Schema mapSchema = Schema.parse(
+ "{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]}}");
+
+
+ public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
+ this.config = config;
+ this.relationalSchema = schemaProvider.getRelationalSchema();
+ this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
+ }
+
+ @Override
+ public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
+ List<Object> values = new ArrayList<>();
+ List<String> fieldNames = new ArrayList<>();
+ Object value = samzaMessage.getValue();
+ if (value instanceof IndexedRecord) {
+ IndexedRecord record = (IndexedRecord) value;
+ fieldNames.addAll(relationalSchema.getFieldNames());
+ values.addAll(relationalSchema.getFieldList()
+ .stream()
+ .map(x -> getRelField(x.getType(), record.get(this.avroSchema.getField(x.getName()).pos())))
+ .collect(Collectors.toList()));
+ } else if (value == null) {
+ fieldNames.addAll(relationalSchema.getFieldNames());
+ IntStream.range(0, fieldNames.size() - 1).forEach(x -> values.add(null));
+ } else {
+ String msg = "Avro message converter doesn't support messages of type " + value.getClass();
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+
+ return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, values);
+ }
+
+ @Override
+ public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
+ GenericRecord record = new GenericData.Record(this.avroSchema);
+ List<String> fieldNames = relMessage.getFieldNames();
+ List<Object> values = relMessage.getFieldValues();
+ for (int index = 0; index < fieldNames.size(); index++) {
+ record.put(fieldNames.get(index), values.get(index));
+ }
+
+ return new KV<>(relMessage.getKey(), record);
+ }
+
+ private Object getRelField(RelDataType relType, Object avroObj) {
+ return AvroToRelObjConverter.valueOf(relType.getClass().getSimpleName()).convert(avroObj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
new file mode 100644
index 0000000..278735f
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverterFactory.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.HashMap;
+import java.util.Properties;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.SamzaRelConverter;
+import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Avro Schema Resolver that uses static config to return a schema for a SystemStream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class AvroRelConverterFactory implements SamzaRelConverterFactory {
+
+ private final HashMap<SystemStream, SamzaRelConverter> relConverters = new HashMap<>();
+
+ @Override
+ public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider schemaProvider, Config config) {
+ return relConverters.computeIfAbsent(systemStream,
+ ss -> new AvroRelConverter(ss, (AvroRelSchemaProvider) schemaProvider, config));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
new file mode 100644
index 0000000..fb11624
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java
@@ -0,0 +1,28 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.system.SystemStream;
+
+
+public interface AvroRelSchemaProvider extends RelSchemaProvider {
+ String getSchema(SystemStream systemStream);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
new file mode 100644
index 0000000..74e15e9
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -0,0 +1,132 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.ArraySqlType;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory that creates the Calcite relational types from the Avro Schema. This is used by the
+ * AvroRelConverter to convert the Avro schema to calcite relational schema.
+ */
+public class AvroTypeFactoryImpl extends SqlTypeFactoryImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroTypeFactoryImpl.class);
+
+ public AvroTypeFactoryImpl() {
+ super(RelDataTypeSystem.DEFAULT);
+ }
+
+ public RelDataType createType(Schema schema) {
+ Schema.Type type = schema.getType();
+ if (type != Schema.Type.RECORD) {
+ String msg =
+ String.format("System supports only RECORD as top level avro type, But the Schema's type is %s", type);
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+
+ List<RelDataTypeField> relFields = getRelFields(schema.getFields());
+ return new RelRecordType(relFields);
+ }
+
+ private List<RelDataTypeField> getRelFields(List<Schema.Field> fields) {
+ List<RelDataTypeField> relFields = new ArrayList<>();
+
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ int fieldPos = field.pos() + 1;
+ RelDataType dataType = getRelDataType(field.schema());
+ relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
+ }
+
+ return relFields;
+ }
+
+ private RelDataType getRelDataType(Schema fieldSchema) {
+ switch (fieldSchema.getType()) {
+ case ARRAY:
+ // TODO JavaTypeFactoryImpl should convert Array into Array(ANY, ANY)
+ // return new ArraySqlType(createSqlType(SqlTypeName.ANY), true);
+ return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+ case BOOLEAN:
+ return createTypeWithNullability(createSqlType(SqlTypeName.BOOLEAN), true);
+ case DOUBLE:
+ return createTypeWithNullability(createSqlType(SqlTypeName.DOUBLE), true);
+ case FLOAT:
+ return createTypeWithNullability(createSqlType(SqlTypeName.FLOAT), true);
+ case ENUM:
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+ case UNION:
+ return getRelTypeFromUnionTypes(fieldSchema.getTypes());
+ case FIXED:
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+ case STRING:
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+ case BYTES:
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARBINARY), true);
+ case INT:
+ return createTypeWithNullability(createSqlType(SqlTypeName.INTEGER), true);
+ case LONG:
+ return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), true);
+ case RECORD:
+// List<RelDataTypeField> relFields = getRelFields(fieldSchema);
+// return new RelRecordType(relFields);
+ // TODO Calcite execution engine doesn't support record type yet.
+ return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
+ case MAP:
+ // JavaTypeFactoryImpl converts map into Map(ANY, ANY)
+ return super.createMapType(createTypeWithNullability(createSqlType(SqlTypeName.ANY), true),
+ createTypeWithNullability(createSqlType(SqlTypeName.ANY), true));
+ default:
+ String msg = String.format("Field Type %s is not supported", fieldSchema.getType());
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+ }
+
+ private RelDataType getRelTypeFromUnionTypes(List<Schema> types) {
+ // Typically a nullable field's schema is configured as an union of Null and a Type.
+ // This is to check whether the Union is a Nullable field
+ if (types.size() == 2) {
+ if (types.get(0).getType() == Schema.Type.NULL) {
+ return getRelDataType(types.get(1));
+ } else if ((types.get(1).getType() == Schema.Type.NULL)) {
+ return getRelDataType(types.get(0));
+ }
+ }
+
+ return createTypeWithNullability(createSqlType(SqlTypeName.VARCHAR), true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
new file mode 100644
index 0000000..4397a75
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/ConfigBasedAvroRelSchemaProviderFactory.java
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.avro;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Avro Schema Resolver that uses static config to return a schema for a SystemStream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class ConfigBasedAvroRelSchemaProviderFactory implements RelSchemaProviderFactory {
+
+ public static final String CFG_SOURCE_SCHEMA = "%s.%s.schema";
+
+ public RelSchemaProvider create(SystemStream systemStream, Config config) {
+ return new ConfigBasedAvroRelSchemaProvider(systemStream, config);
+ }
+
+ public static class ConfigBasedAvroRelSchemaProvider implements AvroRelSchemaProvider {
+ private final Config config;
+ private final SystemStream systemStream;
+
+ public ConfigBasedAvroRelSchemaProvider(SystemStream systemStream, Config config) {
+ this.systemStream = systemStream;
+ this.config = config;
+ }
+
+ public RelDataType getRelationalSchema() {
+ String schemaStr = getSchema(systemStream);
+ Schema schema = Schema.parse(schemaStr);
+ AvroTypeFactoryImpl avroTypeFactory = new AvroTypeFactoryImpl();
+ return avroTypeFactory.createType(schema);
+ }
+
+ @Override
+ public String getSchema(SystemStream systemStream) {
+ return config.get(String.format(CFG_SOURCE_SCHEMA, systemStream.getSystem(), systemStream.getStream()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java b/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
new file mode 100644
index 0000000..9386e31
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/Expression.java
@@ -0,0 +1,38 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import org.apache.calcite.DataContext;
+
+
+/**
+ * {@link RexToJavaCompiler} creates the Java code for each relational expression at runtime.
+ * This is the interface which the runtime generated java code for each Relational expression should implement.
+ */
+public interface Expression {
+ /**
+ * This method is used to implement the expressions that takes in columns as input and returns multiple values.
+ * @param context the context
+ * @param root the root
+ * @param inputValues All the relational columns for the particular row
+ * @param results the results Result values after executing the java code corresponding to the relational expression.
+ */
+ void execute(SamzaSqlExecutionContext context, DataContext root, Object[] inputValues, Object[] results);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
new file mode 100644
index 0000000..21c81a8
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -0,0 +1,224 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.util.List;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.BlockStatement;
+import org.apache.calcite.linq4j.tree.ClassDeclaration;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MemberDeclaration;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.Pair;
+import org.apache.samza.SamzaException;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.commons.compiler.CompilerFactoryFactory;
+import org.codehaus.commons.compiler.IClassBodyEvaluator;
+import org.codehaus.commons.compiler.ICompilerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Defines a SQL row expression to a java class ({@link org.apache.samza.sql.data.Expression}) compiler.
+ *
+ * <p>This is based on Calcite's {@link org.apache.calcite.interpreter.JaninoRexCompiler}. This first generates
+ * a Java AST and them compile it to a class using Janino.</p>
+ */
+public class RexToJavaCompiler {
+ private static final Logger log = LoggerFactory.getLogger(RexToJavaCompiler.class);
+
+ private final RexBuilder rexBuilder;
+
+ public RexToJavaCompiler(RexBuilder rexBuilder) {
+ this.rexBuilder = rexBuilder;
+ }
+
+ /**
+ * Compiles a relational expression to a instance of {@link Expression}
+ *
+ * for e.g.
+ * Query : select id from profile
+ * where profile table has relational schema with id(NUMBER) and name(VARCHAR) columns.
+ * This query will result in the following relational plan
+ * LogicalProject(id=[$1])
+ * LogicalTableScan(table=[[profile]])
+ *
+ * And the corresponding expressions are
+ * inputs : EnumerableTableScan (Which is the output of LogicalTableScan)
+ * nodes : [$1] Which essentially means take pick the first column from the input
+ *
+ *
+ * This function converts the LogicalProject expression "[$1]" with input RexNode which is an output of TableScan
+ * to a java code that implements the interface {@link Expression}
+ *
+ * @param inputs Input relations/time-varying relations for this row expression
+ * @param nodes relational expressions that needs to be converted to java code.
+ * @return compiled expression of type {@link org.apache.samza.sql.data.Expression}
+ */
+ public org.apache.samza.sql.data.Expression compile(List<RelNode> inputs, List<RexNode> nodes) {
+ /*
+ * In case there are multiple input relations, we build a single input row type combining types of all the inputs.
+ */
+ final RelDataTypeFactory.FieldInfoBuilder fieldBuilder = rexBuilder.getTypeFactory().builder();
+ for (RelNode input : inputs) {
+ fieldBuilder.addAll(input.getRowType().getFieldList());
+ }
+ final RelDataType inputRowType = fieldBuilder.build();
+ final RexProgramBuilder programBuilder = new RexProgramBuilder(inputRowType, rexBuilder);
+ for (RexNode node : nodes) {
+ programBuilder.addProject(node, null);
+ }
+ final RexProgram program = programBuilder.getProgram();
+
+ final BlockBuilder builder = new BlockBuilder();
+ final ParameterExpression executionContext = Expressions.parameter(SamzaSqlExecutionContext.class, "context");
+ final ParameterExpression root = DataContext.ROOT;
+ final ParameterExpression inputValues = Expressions.parameter(Object[].class, "inputValues");
+ final ParameterExpression outputValues = Expressions.parameter(Object[].class, "outputValues");
+ final JavaTypeFactoryImpl javaTypeFactory = new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+
+ // public void execute(Object[] inputValues, Object[] outputValues)
+ final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl(ImmutableList.of(
+ Pair.<org.apache.calcite.linq4j.tree.Expression, PhysType>of(
+ Expressions.variable(Object[].class, "inputValues"),
+ PhysTypeImpl.of(javaTypeFactory, inputRowType, JavaRowFormat.ARRAY, false))));
+
+ final List<org.apache.calcite.linq4j.tree.Expression> list =
+ RexToLixTranslator.translateProjects(program, javaTypeFactory, builder, null, DataContext.ROOT, inputGetter,
+ null);
+ for (int i = 0; i < list.size(); i++) {
+ builder.add(Expressions.statement(
+ Expressions.assign(Expressions.arrayIndex(outputValues, Expressions.constant(i)), list.get(i))));
+ }
+ return createSamzaExpressionFromCalcite(executionContext, root, inputValues, outputValues, builder.toBlock());
+ }
+
+ /**
+ * This method takes the java statement block, inputs, outputs needed by the statement block to create an object
+ * of class that implements the interface {@link Expression}
+ *
+ * for e.g.
+ * Query : select id from profile
+ * where profile table has relational schema with id(NUMBER) and name(VARCHAR) columns.
+ * This query will result in the following relational plan
+ * LogicalProject(id=[$1])
+ * LogicalTableScan(table=[[profile]])
+ *
+ *
+ * And the corresponding expressions are
+ * inputs : EnumerableTableScan (Which is the output of LogicalTableScan)
+ * nodes : [$1] Which essentially means take pick the first column from the input
+ *
+ * This expression corresponding to the logicalProject "[$1]" gets converted into a java statement block
+ * {
+ * outputValues[0] = (Integer) inputValues[1];
+ * }
+ *
+ * This method converts this statement block into an equivalent {@link Expression} object whose execute methods
+ * execute the above java statement block
+ *
+ */
+ static org.apache.samza.sql.data.Expression createSamzaExpressionFromCalcite(ParameterExpression executionContext,
+ ParameterExpression dataContext, ParameterExpression inputValues, ParameterExpression outputValues,
+ BlockStatement block) {
+ final List<MemberDeclaration> declarations = Lists.newArrayList();
+
+ // public void execute(Object[] inputValues, Object[] outputValues)
+ declarations.add(
+ Expressions.methodDecl(Modifier.PUBLIC, void.class, SamzaBuiltInMethod.EXPR_EXECUTE2.method.getName(),
+ ImmutableList.of(executionContext, dataContext, inputValues, outputValues), block));
+
+ final ClassDeclaration classDeclaration = Expressions.classDecl(Modifier.PUBLIC, "SqlExpression", null,
+ ImmutableList.<Type>of(org.apache.samza.sql.data.Expression.class), declarations);
+ String s = Expressions.toString(declarations, "\n", false);
+
+ log.info("Generated code for expression: {}", s);
+
+ try {
+ return getExpression(classDeclaration, s);
+ } catch (Exception e) {
+ throw new SamzaException("Expression compilation failure.", e);
+ }
+ }
+
+ /**
+ * Creates the instance of the class defined in {@link ClassDeclaration}
+ * @param expr Interface whose instance needs to be created.
+ * @param s The java code that implements the interface which should be used to create the instance.
+ * @return The object of the class which implements the interface {@link Expression} with the code that is passed as input.
+ * @throws CompileException
+ * @throws IOException
+ */
+ static Expression getExpression(ClassDeclaration expr, String s) throws CompileException, IOException {
+ ICompilerFactory compilerFactory;
+ try {
+ compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory();
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to instantiate java compiler", e);
+ }
+ IClassBodyEvaluator cbe = compilerFactory.newClassBodyEvaluator();
+ cbe.setClassName(expr.name);
+ cbe.setImplementedInterfaces(expr.implemented.toArray(new Class[expr.implemented.size()]));
+ cbe.setParentClassLoader(RexToJavaCompiler.class.getClassLoader());
+ cbe.setDebuggingInformation(true, true, true);
+
+ return (org.apache.samza.sql.data.Expression) cbe.createInstance(new StringReader(s));
+ }
+
+ /**
+ * Represents the methods in the class {@link Expression}
+ */
+ public enum SamzaBuiltInMethod {
+ EXPR_EXECUTE2(org.apache.samza.sql.data.Expression.class, "execute", SamzaSqlExecutionContext.class,
+ DataContext.class, Object[].class, Object[].class);
+
+ public final Method method;
+
+ /**
+ * Defines a method.
+ */
+ SamzaBuiltInMethod(Class clazz, String methodName, Class... argumentTypes) {
+ this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
new file mode 100644
index 0000000..88bcb61
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlExecutionContext.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class SamzaSqlExecutionContext {
+
+ private final SamzaSqlApplicationConfig sqlConfig;
+ private final Map<String, UdfMetadata> udfMetadata;
+ private final Map<String, ScalarUdf> udfInstances = new HashMap<>();
+
+ public SamzaSqlExecutionContext(SamzaSqlApplicationConfig config) {
+ this.sqlConfig = config;
+ udfMetadata =
+ this.sqlConfig.getUdfMetadata().stream().collect(Collectors.toMap(UdfMetadata::getName, Function.identity()));
+ }
+
+ public ScalarUdf getOrCreateUdf(String clazz, String udfName) {
+ return udfInstances.computeIfAbsent(udfName, s -> createInstance(clazz, udfName));
+ }
+
+ public ScalarUdf createInstance(String clazz, String udfName) {
+ Config udfConfig = udfMetadata.get(udfName).getUdfConfig();
+ ScalarUdf scalarUdf = ReflectionUtils.createInstance(clazz);
+ if (scalarUdf == null) {
+ String msg = String.format("Couldn't create udf %s of class %s", udfName, clazz);
+ throw new SamzaException(msg);
+ }
+ scalarUdf.init(udfConfig);
+ return scalarUdf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
new file mode 100644
index 0000000..b5df545
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -0,0 +1,123 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.data;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+
+/**
+ * Samza sql relational message. Each Samza sql relational message represents a relational row in a table.
+ * Each row of the relational table and hence SamzaSqlRelMessage consists of list of column values and
+ * their associated column names. Right now we donot store any other metadata other than the column name in the
+ * SamzaSqlRelationalMessage, In future if we find a need, we could add additional column ddl metadata around
+ * primary Key, nullability, etc.
+ */
+public class SamzaSqlRelMessage {
+
+ public static final String KEY_NAME = "__key__";
+
+ private final List<Object> value = new ArrayList<>();
+ private final List<Object> relFieldValues = new ArrayList<>();
+ private final List<String> names = new ArrayList<>();
+ private final Object key;
+
+ /**
+ * Create the SamzaSqlRelMessage, Each rel message represents a row in the table.
+ * So it can contain a key and a list of fields in the row.
+ * @param key Represents the key in the row, Key is optional, in which case it can be null.
+ * @param names Ordered list of field names in the row.
+ * @param values Ordered list of all the values in the row. Since the samzaSqlRelMessage can represent
+ * the row in a change capture event stream, It can contain delete messages in which case
+ * all the fields in the row can be null.
+ */
+ public SamzaSqlRelMessage(Object key, List<String> names, List<Object> values) {
+ this.key = key;
+ this.value.addAll(values);
+ this.names.addAll(names);
+ if (key != null) {
+ this.relFieldValues.add(key);
+ }
+ this.relFieldValues.addAll(values);
+ }
+
+ /**
+ * Get the field names of all the columns in the relational message.
+ * @return the field names of all columns.
+ */
+ public List<String> getFieldNames() {
+ return names;
+ }
+
+ /**
+ * Get the values of all the columns in the relational message.
+ * @return the values of all the columns
+ */
+ public List<Object> getFieldValues() {
+ return value;
+ }
+
+ public List<Object> getRelFieldValues() {
+ return this.relFieldValues;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ /**
+ * Get the value of the field corresponding to the field name.
+ * @param name Name of the field.
+ * @return returns the value of the field.
+ */
+ public Optional<Object> getField(String name) {
+ for (int index = 0; index < names.size(); index++) {
+ if (names.get(index).equals(name)) {
+ return Optional.ofNullable(value.get(index));
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ /**
+ * Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
+ * If the field list contains KEY, then it extracts the key out of the fields to create the
+ * RelMessage with key and values.
+ * @param fieldValues Field values that can include the key as well.
+ * @param fieldNames Field names in the rel message that can include the special __key__
+ * @return Created SamzaSqlRelMessage.
+ */
+ public static SamzaSqlRelMessage createRelMessage(List<Object> fieldValues, List<String> fieldNames) {
+ int keyIndex = fieldNames.indexOf(KEY_NAME);
+ fieldNames = new ArrayList<>(fieldNames);
+ fieldValues = new ArrayList<>(fieldValues);
+ Object key = null;
+ if (keyIndex != -1) {
+ key = fieldValues.get(keyIndex);
+ fieldValues.remove(keyIndex);
+ fieldNames.remove(keyIndex);
+ }
+
+ return new SamzaSqlRelMessage(key, fieldNames, fieldValues);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
new file mode 100644
index 0000000..39a26ce
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.fn;
+
+import java.util.List;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+public class FlattenUdf implements ScalarUdf {
+ @Override
+ public void init(Config udfConfig) {
+ }
+
+ public Object execute(Object... arg) {
+ List value = (List) arg[0];
+ return value != null && !value.isEmpty() ? value.get(0) : value;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
new file mode 100644
index 0000000..1203b25
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SourceResolverFactory;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Schema Resolver that uses static config to return a config corresponding to a system stream.
+ * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ */
+public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedSourceResolverFactory.class);
+
+ public static final String CFG_FMT_SAMZA_PREFIX = "systems.%s.";
+
+ @Override
+ public SourceResolver create(Config config) {
+ return new ConfigBasedSourceResolver(config);
+ }
+
+ private class ConfigBasedSourceResolver implements SourceResolver {
+ private final Config config;
+
+ public ConfigBasedSourceResolver(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public SqlSystemStreamConfig fetchSourceInfo(String source) {
+ String[] sourceComponents = source.split("\\.");
+ if (sourceComponents.length != 2) {
+ String msg = String.format("Source %s is not of the format <system>.<stream>", source);
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+ String systemName = sourceComponents[0];
+ String streamName = sourceComponents[1];
+
+ return new SqlSystemStreamConfig(systemName, streamName, fetchSystemConfigs(systemName));
+ }
+
+ private Config fetchSystemConfigs(String systemName) {
+ return config.subset(systemName + ".");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
new file mode 100644
index 0000000..c98a4a1
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.impl;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.UdfMetadata;
+import org.apache.samza.sql.interfaces.UdfResolver;
+import org.apache.samza.sql.udfs.ScalarUdf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Udf resolver that uses static config to return the UDFs present in the Samza SQL application
+ * All the UDF classes are provided to this factory as a comma separated list of values for the config named
+ * "udfClasses".
+ * This factory loads all the udf classes that are configured, performs the validation to ensure that they extend
+ * {@link ScalarUdf} and implement the method named "execute"
+ */
+public class ConfigBasedUdfResolver implements UdfResolver {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigBasedUdfResolver.class);
+ public static final String CFG_UDF_CLASSES = "udfClasses";
+ public static final String UDF_METHOD_NAME = "execute";
+
+ private final ArrayList<UdfMetadata> udfs;
+
+ public ConfigBasedUdfResolver(Properties config, Config udfConfig) {
+ List<String> udfClasses = Arrays.stream(config.getProperty(CFG_UDF_CLASSES, "").split(","))
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toList());
+ udfs = new ArrayList<>();
+ Class<?> udfClass;
+ for (String udfClassName : udfClasses) {
+ try {
+ udfClass = Class.forName(udfClassName);
+ } catch (ClassNotFoundException e) {
+ String msg = String.format("Couldn't load the udf class %s", udfClassName);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ }
+
+ if (!ScalarUdf.class.isAssignableFrom(udfClass)) {
+ String msg = String.format("Udf class %s is not extended from %s", udfClassName, ScalarUdf.class.getName());
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+
+ Optional<Method> udfMethod =
+ Arrays.stream(udfClass.getMethods()).filter(x -> x.getName().equals(UDF_METHOD_NAME)).findFirst();
+
+ if (!udfMethod.isPresent()) {
+ String msg = String.format("Udf Class %s doesn't implement method named %s", udfClassName, UDF_METHOD_NAME);
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+
+ int udfIndex = udfClass.getSimpleName().toLowerCase().lastIndexOf("udf");
+ String udfName = udfClass.getSimpleName().substring(0, udfIndex);
+
+ udfs.add(new UdfMetadata(udfName, udfMethod.get(), udfConfig.subset(udfName + ".")));
+ }
+ }
+
+ @Override
+ public Collection<UdfMetadata> getUdfs() {
+ return udfs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
new file mode 100644
index 0000000..ee95224
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProvider.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.calcite.rel.type.RelDataType;
+
+
+/**
+ * Samza SQL application uses {@link RelSchemaProvider} to get the relational schema corresponding to the system
+ * and stream.
+ */
+public interface RelSchemaProvider {
+ /**
+ * Converts the schema to relational schema. It is possible that there can be multiple tables associated
+ * within a single schema.
+ * @return Relational schema corresponding to the system stream.
+ */
+ RelDataType getRelationalSchema();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
new file mode 100644
index 0000000..c614cdf
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/RelSchemaProviderFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+public interface RelSchemaProviderFactory {
+
+ /**
+ * Create a {@link RelSchemaProvider} given the config
+ * @param config Config needed to create the {@link RelSchemaProvider}
+ * @return {@link RelSchemaProvider} object created.
+ */
+ RelSchemaProvider create(SystemStream systemStream, Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
new file mode 100644
index 0000000..12d5f28
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java
@@ -0,0 +1,46 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+
+
+/**
+ * Samza SQL application uses {@link SamzaRelConverter} to convert the Samza messages to relational message before
+ * it can be processed by the calcite engine.
+ * The {@link SamzaRelConverter} is configurable at a system level, So it is possible to configure different
+ * {@link SamzaRelConverter} for different systems.
+ */
+public interface SamzaRelConverter {
+ /**
+ * Converts the object to relational message corresponding to the tableName with relational schema.
+ * @param message samza message that needs to be converted.
+ * @return Relational message extracted from the object.
+ */
+ SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> message);
+
+ /**
+ * Convert the relational message to the output message.
+ * @param relMessage relational message that needs to be converted.
+ * @return the key and value of the Samza message
+ */
+ KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
new file mode 100644
index 0000000..f239df6
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverterFactory.java
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Factory that is used to create {@link SamzaRelConverter}
+ */
+public interface SamzaRelConverterFactory {
+
+ /**
+ * Create a {@link SamzaRelConverter}. This method is called when the framework wants to create the
+ * {@link SamzaRelConverter} corresponding to the system.
+ * @param config
+ * config that is used to create the object
+ * @return Returns the object created.
+ */
+ SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
new file mode 100644
index 0000000..ac3fd31
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolver.java
@@ -0,0 +1,34 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+/**
+ * Source Resolvers are used by Samza Sql application to fetch the {@link SqlSystemStreamConfig} corresponding to the source.
+ */
+public interface SourceResolver {
+ /**
+ * Returns the SystemStream config corresponding to the source name
+ * @param sourceName
+ * source whose systemstreamconfig needs to be fetched.
+ * @return
+ * System stream config corresponding to the source.
+ */
+ SqlSystemStreamConfig fetchSourceInfo(String sourceName);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
new file mode 100644
index 0000000..274a6b1
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SourceResolverFactory.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory that is used to create {@link SourceResolver}.
+ */
+public interface SourceResolverFactory {
+
+ /**
+ * Create the {@link SourceResolver}. This is called during the application initialization.
+ * @param config config for the SourceResolver
+ * @return Returns the created {@link SourceResolver}
+ */
+ SourceResolver create(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
new file mode 100644
index 0000000..df21784
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
@@ -0,0 +1,74 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Configs associated with a system stream.
+ */
+public class SqlSystemStreamConfig {
+
+ public static final String CFG_SAMZA_REL_CONVERTER = "samzaRelConverterName";
+ public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
+
+ private final String systemName;
+
+ private final String streamName;
+
+ private final String samzaRelConverterName;
+ private final SystemStream systemStream;
+ private String relSchemaProviderName;
+
+ public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) {
+
+ this.systemName = systemName;
+ this.streamName = streamName;
+ this.systemStream = new SystemStream(systemName, streamName);
+
+ samzaRelConverterName = systemConfig.get(CFG_SAMZA_REL_CONVERTER);
+ relSchemaProviderName = systemConfig.get(CFG_REL_SCHEMA_PROVIDER);
+ Validate.notEmpty(samzaRelConverterName,
+ String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+ }
+
+ public String getSystemName() {
+ return systemName;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getSamzaRelConverterName() {
+ return samzaRelConverterName;
+ }
+
+ public String getRelSchemaProviderName() {
+ return relSchemaProviderName;
+ }
+
+ public SystemStream getSystemStream() {
+ return systemStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
new file mode 100644
index 0000000..b1a2d6d
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.lang.reflect.Method;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Metadata corresponding to the Udf
+ */
+public class UdfMetadata {
+
+ private final String name;
+
+ private final Method udfMethod;
+
+ private final Config udfConfig;
+
+ public UdfMetadata(String name, Method udfMethod, Config udfConfig) {
+ this.name = name;
+ this.udfMethod = udfMethod;
+ this.udfConfig = udfConfig;
+ }
+
+ public Config getUdfConfig() {
+ return udfConfig;
+ }
+
+ /**
+ * @return Returns the instance of the {@link Method} corresponding to the UDF.
+ */
+ public Method getUdfMethod() {
+ return udfMethod;
+ }
+
+ /**
+ * @return Returns the name of the Udf.
+ */
+ public String getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
new file mode 100644
index 0000000..7e84118
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfResolver.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.interfaces;
+
+import java.util.Collection;
+
+/**
+ * Plugin that is used to discover the UDFs available for the Samza SQL.
+ */
+public interface UdfResolver {
+
+ /**
+ * This method is called by the framework during initialization to fetch the {@link UdfMetadata} for all the UDFs
+ * available in the system.
+ * @return {@link UdfMetadata} for all the Udfs available in the system.
+ */
+ Collection<UdfMetadata> getUdfs();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9fa8beed/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
new file mode 100644
index 0000000..851c895
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.collect.Maps;
+
+
+class Checker implements SqlOperandTypeChecker {
+ private SqlOperandCountRange range;
+
+ public static final Checker ANY_CHECKER = new Checker();
+ private static final Map<Pair<Integer, Integer>, Checker> checkerMap = Maps.newHashMap();
+
+ public static Checker getChecker(int min, int max) {
+ final Pair<Integer, Integer> range = Pair.of(min, max);
+ if (checkerMap.containsKey(range)) {
+ return checkerMap.get(range);
+ }
+
+ final Checker newChecker;
+ if (min == max) {
+ newChecker = new Checker(min);
+ } else {
+ newChecker = new Checker(min, max);
+ }
+
+ checkerMap.put(range, newChecker);
+ return newChecker;
+ }
+
+ private Checker(int size) {
+ range = SqlOperandCountRanges.of(size);
+ }
+
+ private Checker(int min, int max) {
+ range = SqlOperandCountRanges.between(min, max);
+ }
+
+ private Checker() {
+ range = SqlOperandCountRanges.any();
+ }
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+ return true;
+ }
+
+ @Override
+ public SqlOperandCountRange getOperandCountRange() {
+ return range;
+ }
+
+ @Override
+ public String getAllowedSignatures(SqlOperator op, String opName) {
+ return opName + "(Drill - Opaque)";
+ }
+
+ @Override
+ public Consistency getConsistency() {
+ return Consistency.NONE;
+ }
+
+ @Override
+ public boolean isOptional(int i) {
+ return false;
+ }
+}
\ No newline at end of file