You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/17 18:06:00 UTC
samza git commit: SAMZA-1556: Adding support for multi level sources
in queries
Repository: samza
Updated Branches:
refs/heads/master 3a9e80642 -> fcd2b9a53
SAMZA-1556: Adding support for multi level sources in queries
Right now Samza SQL supports queries with just two levels i.e. `select * from foo.bar`. But there can be sources that are identified though multiple levels. for e.g. `select * from kafka.clusterName.topicName`.
This change adds the support for sql queries with sources that have more than two levels.
Author: Srinivasulu Punuru <sp...@linkedin.com>
Reviewers: Miguel S<mi...@linkedin.com>, Aditya T<at...@linkedin.com>
Closes #405 from srinipunuru/multi-level.1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fcd2b9a5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fcd2b9a5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fcd2b9a5
Branch: refs/heads/master
Commit: fcd2b9a53367c2e0b53df7a1fb7eedc8086b285c
Parents: 3a9e806
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Wed Jan 17 10:06:50 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 10:06:50 2018 -0800
----------------------------------------------------------------------
.../impl/ConfigBasedSourceResolverFactory.java | 8 ++--
.../sql/interfaces/SqlSystemStreamConfig.java | 47 +++++++++++++++++-
.../apache/samza/sql/planner/QueryPlanner.java | 38 ++++++++++-----
.../sql/runner/SamzaSqlApplicationConfig.java | 23 +++++----
.../sql/runner/SamzaSqlApplicationRunner.java | 3 ++
.../samza/sql/translator/QueryTranslator.java | 5 +-
.../samza/sql/translator/ScanTranslator.java | 26 ++++------
.../apache/samza/sql/TestQueryTranslator.java | 2 +-
.../samza/sql/testutil/SamzaSqlTestConfig.java | 2 +-
.../sql/testutil/TestSourceResolverFactory.java | 50 ++++++++++++++++++++
10 files changed, 154 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
index 1203b25..a2d8b0c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
@@ -29,8 +29,8 @@ import org.slf4j.LoggerFactory;
/**
- * Schema Resolver that uses static config to return a config corresponding to a system stream.
- * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ * Source Resolver implementation that uses static config to return a config corresponding to a system stream.
+ * This Source resolver implementation supports sources of type {systemName}.{streamName}
*/
public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
@@ -53,8 +53,10 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
@Override
public SqlSystemStreamConfig fetchSourceInfo(String source) {
String[] sourceComponents = source.split("\\.");
+
+ // This source resolver expects sources of format {systemName}.{streamName}
if (sourceComponents.length != 2) {
- String msg = String.format("Source %s is not of the format <system>.<stream>", source);
+ String msg = String.format("Source %s is not of the format {systemName}.{streamName{", source);
LOG.error(msg);
throw new SamzaException(msg);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
index df21784..d8965a4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
@@ -19,8 +19,13 @@
package org.apache.samza.sql.interfaces;
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import org.apache.commons.lang.Validate;
import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.system.SystemStream;
@@ -38,18 +43,48 @@ public class SqlSystemStreamConfig {
private final String samzaRelConverterName;
private final SystemStream systemStream;
+
+ private final String source;
private String relSchemaProviderName;
+ private Config config;
+
+ private List<String> sourceParts;
+
public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) {
+ this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig);
+ }
+
+ public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts,
+ Config systemConfig) {
+
+ HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
this.systemName = systemName;
this.streamName = streamName;
+ this.source = getSourceFromSourceParts(sourceParts);
+ this.sourceParts = sourceParts;
this.systemStream = new SystemStream(systemName, streamName);
- samzaRelConverterName = systemConfig.get(CFG_SAMZA_REL_CONVERTER);
- relSchemaProviderName = systemConfig.get(CFG_REL_SCHEMA_PROVIDER);
+ samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
Validate.notEmpty(samzaRelConverterName,
String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+
+ relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
+
+ // Removing the Samza SQL specific configs to get the remaining Samza configs.
+ streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
+ streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
+
+ config = new MapConfig(streamConfigs);
+ }
+
+ public static String getSourceFromSourceParts(List<String> sourceParts) {
+ return Joiner.on(".").join(sourceParts);
+ }
+
+ public List<String> getSourceParts() {
+ return sourceParts;
}
public String getSystemName() {
@@ -71,4 +106,12 @@ public class SqlSystemStreamConfig {
public SystemStream getSystemStream() {
return systemStream;
}
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public String getSource() {
+ return source;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index ce03ba3..061c03f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -23,7 +23,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -69,11 +68,16 @@ import org.slf4j.LoggerFactory;
public class QueryPlanner {
private static final Logger LOG = LoggerFactory.getLogger(QueryPlanner.class);
- private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
private final Collection<UdfMetadata> udfMetadata;
- private final Map<SystemStream, RelSchemaProvider> relSchemaProviders;
- public QueryPlanner(Map<SystemStream, RelSchemaProvider> relSchemaProviders,
+ // Mapping between the source to the RelSchemaProvider corresponding to the source.
+ private final Map<String, RelSchemaProvider> relSchemaProviders;
+
+ // Mapping between the source to the SqlSystemStreamConfig corresponding to the source.
+ private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
+
+
+ public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
Map<String, SqlSystemStreamConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
this.relSchemaProviders = relSchemaProviders;
this.systemStreamConfigBySource = systemStreamConfigBySource;
@@ -85,16 +89,26 @@ public class QueryPlanner {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
- Map<String, SchemaPlus> systemSchemas = new HashMap<>();
for (SqlSystemStreamConfig ssc : systemStreamConfigBySource.values()) {
-
- RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSystemStream());
- SchemaPlus systemSchema =
- systemSchemas.computeIfAbsent(ssc.getSystemName(), s -> rootSchema.add(s, new AbstractSchema()));
- RelDataType relationalSchema = relSchemaProvider.getRelationalSchema();
-
- systemSchema.add(ssc.getStreamName(), createTableFromRelSchema(relationalSchema));
+ SchemaPlus previousLevelSchema = rootSchema;
+ List<String> sourceParts = ssc.getSourceParts();
+ RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());
+
+ for (String sourcePart : sourceParts) {
+ if (!sourcePart.equalsIgnoreCase(ssc.getStreamName())) {
+ SchemaPlus sourcePartSchema = rootSchema.getSubSchema(sourcePart);
+ if (sourcePartSchema == null) {
+ sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
+ }
+ previousLevelSchema = sourcePartSchema;
+ } else {
+ // If the source part is the streamName, then fetch the schema corresponding to the stream and register.
+ RelDataType relationalSchema = relSchemaProvider.getRelationalSchema();
+ previousLevelSchema.add(ssc.getStreamName(), createTableFromRelSchema(relationalSchema));
+ break;
+ }
+ }
}
List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index f029745..227a0f1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -78,8 +78,8 @@ public class SamzaSqlApplicationConfig {
public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
- private final Map<SystemStream, RelSchemaProvider> relSchemaProvidersBySystemStream;
- private final Map<SystemStream, SamzaRelConverter> samzaRelConvertersBySystemStream;
+ private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
+ private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
private SourceResolver sourceResolver;
private UdfResolver udfResolver;
@@ -87,7 +87,6 @@ public class SamzaSqlApplicationConfig {
private final Collection<UdfMetadata> udfMetadata;
private final Map<String, SqlSystemStreamConfig> inputSystemStreamConfigBySource;
-
private final Map<String, SqlSystemStreamConfig> outputSystemStreamConfigsBySource;
private final List<String> sql;
@@ -117,17 +116,17 @@ public class SamzaSqlApplicationConfig {
.collect(Collectors.toMap(Function.identity(), x -> sourceResolver.fetchSourceInfo(x)));
systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
- relSchemaProvidersBySystemStream = systemStreamConfigs.stream()
- .collect(Collectors.toMap(SqlSystemStreamConfig::getSystemStream,
+ relSchemaProvidersBySource = systemStreamConfigs.stream()
+ .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig,
CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
(o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c))));
- samzaRelConvertersBySystemStream = systemStreamConfigs.stream()
- .collect(Collectors.toMap(SqlSystemStreamConfig::getSystemStream,
+ samzaRelConvertersBySource = systemStreamConfigs.stream()
+ .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
- relSchemaProvidersBySystemStream.get(x.getSystemStream()), c))));
+ relSchemaProvidersBySource.get(x.getSource()), c))));
}
private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -235,11 +234,11 @@ public class SamzaSqlApplicationConfig {
return outputSystemStreamConfigsBySource;
}
- public Map<SystemStream, SamzaRelConverter> getSamzaRelConverters() {
- return samzaRelConvertersBySystemStream;
+ public Map<String, SamzaRelConverter> getSamzaRelConverters() {
+ return samzaRelConvertersBySource;
}
- public Map<SystemStream, RelSchemaProvider> getRelSchemaProviders() {
- return relSchemaProvidersBySystemStream;
+ public Map<String, RelSchemaProvider> getRelSchemaProviders() {
+ return relSchemaProvidersBySource;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 57d889f..5215f7e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -85,10 +85,13 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
SqlSystemStreamConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
inputSystemStreamConfig.getSystemName());
+ newConfig.putAll(inputSystemStreamConfig.getConfig());
}
+
SqlSystemStreamConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
outputSystemStreamConfig.getSystemName());
+ newConfig.putAll(outputSystemStreamConfig.getConfig());
}
if (localRunner) {
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index ab17018..87e37f4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -50,7 +50,8 @@ public class QueryTranslator {
public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
this.sqlConfig = sqlConfig;
- scanTranslator = new ScanTranslator(sqlConfig.getSamzaRelConverters());
+ scanTranslator =
+ new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource());
}
public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph streamGraph) {
@@ -87,7 +88,7 @@ public class QueryTranslator {
SqlSystemStreamConfig outputSystemConfig =
sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource());
- SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(outputSystemConfig.getSystemStream());
+ SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getOutputSource());
MessageStreamImpl<SamzaSqlRelMessage> stream =
(MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
MessageStream<KV<Object, Object>> outputStream = stream.map(samzaMsgConverter::convertToSamzaMessage);
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 202bdbd..30e5a9b 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -21,19 +21,14 @@ package org.apache.samza.sql.translator;
import java.util.List;
import java.util.Map;
-
import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.commons.lang.Validate;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-
-import com.google.common.base.Joiner;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
/**
@@ -42,25 +37,22 @@ import org.apache.samza.system.SystemStream;
*/
public class ScanTranslator {
- private final Map<SystemStream, SamzaRelConverter> relMsgConverters;
+ private final Map<String, SamzaRelConverter> relMsgConverters;
+ private final Map<String, SqlSystemStreamConfig> systemStreamConfig;
- public ScanTranslator(Map<SystemStream, SamzaRelConverter> converters) {
+ public ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemStreamConfig> ssc) {
relMsgConverters = converters;
+ this.systemStreamConfig = ssc;
}
public void translate(final TableScan tableScan, final TranslatorContext context) {
StreamGraph streamGraph = context.getStreamGraph();
List<String> tableNameParts = tableScan.getTable().getQualifiedName();
- Validate.isTrue(tableNameParts.size() == 2,
- String.format("table name %s is not of the format <SystemName>.<StreamName>",
- Joiner.on(".").join(tableNameParts)));
-
- String streamName = tableNameParts.get(1);
- String systemName = tableNameParts.get(0);
- SystemStream systemStream = new SystemStream(systemName, streamName);
+ String sourceName = SqlSystemStreamConfig.getSourceFromSourceParts(tableNameParts);
- Validate.isTrue(relMsgConverters.containsKey(systemStream), String.format("Unknown system %s", systemName));
- SamzaRelConverter converter = relMsgConverters.get(systemStream);
+ Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName));
+ SamzaRelConverter converter = relMsgConverters.get(sourceName);
+ String streamName = systemStreamConfig.get(sourceName).getStreamName();
MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(streamName);
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(converter::convertToRelMessage);
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
index 7fa9974..04fdec5 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -39,7 +39,7 @@ public class TestQueryTranslator {
public void testTranslate() {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
- "Insert into testavro.outputTopic select MyTest(id) from testavro.SIMPLE1");
+ "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1");
Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 008eb82..92766f6 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -59,7 +59,7 @@ public class SamzaSqlTestConfig {
String configSourceResolverDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
staticConfigs.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
- ConfigBasedSourceResolverFactory.class.getName());
+ TestSourceResolverFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
new file mode 100644
index 0000000..b9cf803
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
@@ -0,0 +1,50 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.Arrays;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SourceResolverFactory;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+
+
+public class TestSourceResolverFactory implements SourceResolverFactory {
+ @Override
+ public SourceResolver create(Config config) {
+ return new TestSourceResolver(config);
+ }
+
+ private class TestSourceResolver implements SourceResolver {
+ private final Config config;
+
+ public TestSourceResolver(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public SqlSystemStreamConfig fetchSourceInfo(String sourceName) {
+ String[] sourceComponents = sourceName.split("\\.");
+ Config systemConfigs = config.subset(sourceComponents[0] + ".");
+ return new SqlSystemStreamConfig(sourceComponents[0], sourceComponents[sourceComponents.length - 1],
+ Arrays.asList(sourceComponents), systemConfigs);
+ }
+ }
+}