You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/17 18:06:00 UTC

samza git commit: SAMZA-1556: Adding support for multi level sources in queries

Repository: samza
Updated Branches:
  refs/heads/master 3a9e80642 -> fcd2b9a53


SAMZA-1556: Adding support for multi level sources in queries

Right now Samza SQL supports queries with just two levels i.e. `select * from foo.bar`. But there can be sources that are identified though multiple levels. for e.g. `select * from kafka.clusterName.topicName`.

This change adds the support for sql queries with sources that have more than two levels.

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Miguel S<mi...@linkedin.com>, Aditya T<at...@linkedin.com>

Closes #405 from srinipunuru/multi-level.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fcd2b9a5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fcd2b9a5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fcd2b9a5

Branch: refs/heads/master
Commit: fcd2b9a53367c2e0b53df7a1fb7eedc8086b285c
Parents: 3a9e806
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Wed Jan 17 10:06:50 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 10:06:50 2018 -0800

----------------------------------------------------------------------
 .../impl/ConfigBasedSourceResolverFactory.java  |  8 ++--
 .../sql/interfaces/SqlSystemStreamConfig.java   | 47 +++++++++++++++++-
 .../apache/samza/sql/planner/QueryPlanner.java  | 38 ++++++++++-----
 .../sql/runner/SamzaSqlApplicationConfig.java   | 23 +++++----
 .../sql/runner/SamzaSqlApplicationRunner.java   |  3 ++
 .../samza/sql/translator/QueryTranslator.java   |  5 +-
 .../samza/sql/translator/ScanTranslator.java    | 26 ++++------
 .../apache/samza/sql/TestQueryTranslator.java   |  2 +-
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |  2 +-
 .../sql/testutil/TestSourceResolverFactory.java | 50 ++++++++++++++++++++
 10 files changed, 154 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
index 1203b25..a2d8b0c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedSourceResolverFactory.java
@@ -29,8 +29,8 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Schema Resolver that uses static config to return a config corresponding to a system stream.
- * Schemas are configured using the config of format {systemName}.{streamName}.schema.
+ * Source Resolver implementation that uses static config to return a config corresponding to a system stream.
+ * This Source resolver implementation supports sources of type {systemName}.{streamName}
  */
 public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
 
@@ -53,8 +53,10 @@ public class ConfigBasedSourceResolverFactory implements SourceResolverFactory {
     @Override
     public SqlSystemStreamConfig fetchSourceInfo(String source) {
       String[] sourceComponents = source.split("\\.");
+
+      // This source resolver expects sources of format {systemName}.{streamName}
       if (sourceComponents.length != 2) {
-        String msg = String.format("Source %s is not of the format <system>.<stream>", source);
+        String msg = String.format("Source %s is not of the format {systemName}.{streamName{", source);
         LOG.error(msg);
         throw new SamzaException(msg);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
index df21784..d8965a4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlSystemStreamConfig.java
@@ -19,8 +19,13 @@
 
 package org.apache.samza.sql.interfaces;
 
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.system.SystemStream;
 
 
@@ -38,18 +43,48 @@ public class SqlSystemStreamConfig {
 
   private final String samzaRelConverterName;
   private final SystemStream systemStream;
+
+  private final String source;
   private String relSchemaProviderName;
 
+  private Config config;
+
+  private List<String> sourceParts;
+
   public SqlSystemStreamConfig(String systemName, String streamName, Config systemConfig) {
+    this(systemName, streamName, Arrays.asList(systemName, streamName), systemConfig);
+  }
+
+  public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts,
+      Config systemConfig) {
 
+
+    HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
     this.systemName = systemName;
     this.streamName = streamName;
+    this.source = getSourceFromSourceParts(sourceParts);
+    this.sourceParts = sourceParts;
     this.systemStream = new SystemStream(systemName, streamName);
 
-    samzaRelConverterName = systemConfig.get(CFG_SAMZA_REL_CONVERTER);
-    relSchemaProviderName = systemConfig.get(CFG_REL_SCHEMA_PROVIDER);
+    samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
     Validate.notEmpty(samzaRelConverterName,
         String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
+
+    relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
+
+    // Removing the Samza SQL specific configs to get the remaining Samza configs.
+    streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
+    streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
+
+    config = new MapConfig(streamConfigs);
+  }
+
+  public static String getSourceFromSourceParts(List<String> sourceParts) {
+    return Joiner.on(".").join(sourceParts);
+  }
+
+  public List<String> getSourceParts() {
+    return sourceParts;
   }
 
   public String getSystemName() {
@@ -71,4 +106,12 @@ public class SqlSystemStreamConfig {
   public SystemStream getSystemStream() {
     return systemStream;
   }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public String getSource() {
+    return source;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index ce03ba3..061c03f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -23,7 +23,6 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -69,11 +68,16 @@ import org.slf4j.LoggerFactory;
 public class QueryPlanner {
   private static final Logger LOG = LoggerFactory.getLogger(QueryPlanner.class);
 
-  private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
   private final Collection<UdfMetadata> udfMetadata;
-  private final Map<SystemStream, RelSchemaProvider> relSchemaProviders;
 
-  public QueryPlanner(Map<SystemStream, RelSchemaProvider> relSchemaProviders,
+  // Mapping between the source to the RelSchemaProvider corresponding to the source.
+  private final Map<String, RelSchemaProvider> relSchemaProviders;
+
+  // Mapping between the source to the SqlSystemStreamConfig corresponding to the source.
+  private final Map<String, SqlSystemStreamConfig> systemStreamConfigBySource;
+
+
+  public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
       Map<String, SqlSystemStreamConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
     this.relSchemaProviders = relSchemaProviders;
     this.systemStreamConfigBySource = systemStreamConfigBySource;
@@ -85,16 +89,26 @@ public class QueryPlanner {
       Connection connection = DriverManager.getConnection("jdbc:calcite:");
       CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
       SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      Map<String, SchemaPlus> systemSchemas = new HashMap<>();
 
       for (SqlSystemStreamConfig ssc : systemStreamConfigBySource.values()) {
-
-        RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSystemStream());
-        SchemaPlus systemSchema =
-            systemSchemas.computeIfAbsent(ssc.getSystemName(), s -> rootSchema.add(s, new AbstractSchema()));
-        RelDataType relationalSchema = relSchemaProvider.getRelationalSchema();
-
-        systemSchema.add(ssc.getStreamName(), createTableFromRelSchema(relationalSchema));
+        SchemaPlus previousLevelSchema = rootSchema;
+        List<String> sourceParts = ssc.getSourceParts();
+        RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());
+
+        for (String sourcePart : sourceParts) {
+          if (!sourcePart.equalsIgnoreCase(ssc.getStreamName())) {
+            SchemaPlus sourcePartSchema = rootSchema.getSubSchema(sourcePart);
+            if (sourcePartSchema == null) {
+              sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
+            }
+            previousLevelSchema = sourcePartSchema;
+          } else {
+            // If the source part is the streamName, then fetch the schema corresponding to the stream and register.
+            RelDataType relationalSchema = relSchemaProvider.getRelationalSchema();
+            previousLevelSchema.add(ssc.getStreamName(), createTableFromRelSchema(relationalSchema));
+            break;
+          }
+        }
       }
 
       List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index f029745..227a0f1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -78,8 +78,8 @@ public class SamzaSqlApplicationConfig {
 
   public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
   public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
-  private final Map<SystemStream, RelSchemaProvider> relSchemaProvidersBySystemStream;
-  private final Map<SystemStream, SamzaRelConverter> samzaRelConvertersBySystemStream;
+  private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
+  private final Map<String, SamzaRelConverter> samzaRelConvertersBySource;
 
   private SourceResolver sourceResolver;
   private UdfResolver udfResolver;
@@ -87,7 +87,6 @@ public class SamzaSqlApplicationConfig {
   private final Collection<UdfMetadata> udfMetadata;
 
   private final Map<String, SqlSystemStreamConfig> inputSystemStreamConfigBySource;
-
   private final Map<String, SqlSystemStreamConfig> outputSystemStreamConfigsBySource;
 
   private final List<String> sql;
@@ -117,17 +116,17 @@ public class SamzaSqlApplicationConfig {
         .collect(Collectors.toMap(Function.identity(), x -> sourceResolver.fetchSourceInfo(x)));
     systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
 
-    relSchemaProvidersBySystemStream = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemStreamConfig::getSystemStream,
+    relSchemaProvidersBySource = systemStreamConfigs.stream()
+        .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
             x -> initializePlugin("RelSchemaProvider", x.getRelSchemaProviderName(), staticConfig,
                 CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
                 (o, c) -> ((RelSchemaProviderFactory) o).create(x.getSystemStream(), c))));
 
-    samzaRelConvertersBySystemStream = systemStreamConfigs.stream()
-        .collect(Collectors.toMap(SqlSystemStreamConfig::getSystemStream,
+    samzaRelConvertersBySource = systemStreamConfigs.stream()
+        .collect(Collectors.toMap(SqlSystemStreamConfig::getSource,
             x -> initializePlugin("SamzaRelConverter", x.getSamzaRelConverterName(), staticConfig,
                 CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, (o, c) -> ((SamzaRelConverterFactory) o).create(x.getSystemStream(),
-                    relSchemaProvidersBySystemStream.get(x.getSystemStream()), c))));
+                    relSchemaProvidersBySource.get(x.getSource()), c))));
   }
 
   private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -235,11 +234,11 @@ public class SamzaSqlApplicationConfig {
     return outputSystemStreamConfigsBySource;
   }
 
-  public Map<SystemStream, SamzaRelConverter> getSamzaRelConverters() {
-    return samzaRelConvertersBySystemStream;
+  public Map<String, SamzaRelConverter> getSamzaRelConverters() {
+    return samzaRelConvertersBySource;
   }
 
-  public Map<SystemStream, RelSchemaProvider> getRelSchemaProviders() {
-    return relSchemaProvidersBySystemStream;
+  public Map<String, RelSchemaProvider> getRelSchemaProviders() {
+    return relSchemaProvidersBySource;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 57d889f..5215f7e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -85,10 +85,13 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
         SqlSystemStreamConfig inputSystemStreamConfig = sourceResolver.fetchSourceInfo(inputSource);
         newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
             inputSystemStreamConfig.getSystemName());
+        newConfig.putAll(inputSystemStreamConfig.getConfig());
       }
+
       SqlSystemStreamConfig outputSystemStreamConfig = sourceResolver.fetchSourceInfo(query.getOutputSource());
       newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
           outputSystemStreamConfig.getSystemName());
+      newConfig.putAll(outputSystemStreamConfig.getConfig());
     }
 
     if (localRunner) {

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index ab17018..87e37f4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -50,7 +50,8 @@ public class QueryTranslator {
 
   public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
     this.sqlConfig = sqlConfig;
-    scanTranslator = new ScanTranslator(sqlConfig.getSamzaRelConverters());
+    scanTranslator =
+        new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource());
   }
 
   public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamGraph streamGraph) {
@@ -87,7 +88,7 @@ public class QueryTranslator {
 
     SqlSystemStreamConfig outputSystemConfig =
         sqlConfig.getOutputSystemStreamConfigsBySource().get(queryInfo.getOutputSource());
-    SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(outputSystemConfig.getSystemStream());
+    SamzaRelConverter samzaMsgConverter = sqlConfig.getSamzaRelConverters().get(queryInfo.getOutputSource());
     MessageStreamImpl<SamzaSqlRelMessage> stream =
         (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
     MessageStream<KV<Object, Object>> outputStream = stream.map(samzaMsgConverter::convertToSamzaMessage);

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 202bdbd..30e5a9b 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -21,19 +21,14 @@ package org.apache.samza.sql.translator;
 
 import java.util.List;
 import java.util.Map;
-
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-
-import com.google.common.base.Joiner;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
 
 
 /**
@@ -42,25 +37,22 @@ import org.apache.samza.system.SystemStream;
  */
 public class ScanTranslator {
 
-  private final Map<SystemStream, SamzaRelConverter> relMsgConverters;
+  private final Map<String, SamzaRelConverter> relMsgConverters;
+  private final Map<String, SqlSystemStreamConfig> systemStreamConfig;
 
-  public ScanTranslator(Map<SystemStream, SamzaRelConverter> converters) {
+  public ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlSystemStreamConfig> ssc) {
     relMsgConverters = converters;
+    this.systemStreamConfig = ssc;
   }
 
   public void translate(final TableScan tableScan, final TranslatorContext context) {
     StreamGraph streamGraph = context.getStreamGraph();
     List<String> tableNameParts = tableScan.getTable().getQualifiedName();
-    Validate.isTrue(tableNameParts.size() == 2,
-        String.format("table name %s is not of the format <SystemName>.<StreamName>",
-            Joiner.on(".").join(tableNameParts)));
-
-    String streamName = tableNameParts.get(1);
-    String systemName = tableNameParts.get(0);
-    SystemStream systemStream = new SystemStream(systemName, streamName);
+    String sourceName = SqlSystemStreamConfig.getSourceFromSourceParts(tableNameParts);
 
-    Validate.isTrue(relMsgConverters.containsKey(systemStream), String.format("Unknown system %s", systemName));
-    SamzaRelConverter converter = relMsgConverters.get(systemStream);
+    Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName));
+    SamzaRelConverter converter = relMsgConverters.get(sourceName);
+    String streamName = systemStreamConfig.get(sourceName).getStreamName();
 
     MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(streamName);
     MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(converter::convertToRelMessage);

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
index 7fa9974..04fdec5 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java
@@ -39,7 +39,7 @@ public class TestQueryTranslator {
   public void testTranslate() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
-        "Insert into testavro.outputTopic select MyTest(id) from testavro.SIMPLE1");
+        "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 008eb82..92766f6 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -59,7 +59,7 @@ public class SamzaSqlTestConfig {
     String configSourceResolverDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
     staticConfigs.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
-        ConfigBasedSourceResolverFactory.class.getName());
+        TestSourceResolverFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
     String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");

http://git-wip-us.apache.org/repos/asf/samza/blob/fcd2b9a5/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
new file mode 100644
index 0000000..b9cf803
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java
@@ -0,0 +1,50 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.testutil;
+
+import java.util.Arrays;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.interfaces.SourceResolver;
+import org.apache.samza.sql.interfaces.SourceResolverFactory;
+import org.apache.samza.sql.interfaces.SqlSystemStreamConfig;
+
+
+public class TestSourceResolverFactory implements SourceResolverFactory {
+  @Override
+  public SourceResolver create(Config config) {
+    return new TestSourceResolver(config);
+  }
+
+  private class TestSourceResolver implements SourceResolver {
+    private final Config config;
+
+    public TestSourceResolver(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    public SqlSystemStreamConfig fetchSourceInfo(String sourceName) {
+      String[] sourceComponents = sourceName.split("\\.");
+      Config systemConfigs = config.subset(sourceComponents[0] + ".");
+      return new SqlSystemStreamConfig(sourceComponents[0], sourceComponents[sourceComponents.length - 1],
+          Arrays.asList(sourceComponents), systemConfigs);
+    }
+  }
+}