You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:09:59 UTC

[13/20] storm git commit: [StormSQL] Refactor to support compiling StormSQL to Trident topology.

[StormSQL] Refactor to support compiling StormSQL to Trident topology.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/82f10ebf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/82f10ebf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/82f10ebf

Branch: refs/heads/master
Commit: 82f10ebfd00ba71b5c95b95e917d856c5eff9c9f
Parents: 4d8cc41
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Nov 6 15:50:31 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:28 2015 -0800

----------------------------------------------------------------------
 external/sql/storm-sql-core/pom.xml             |  7 ++
 .../apache/storm/sql/DataSourcesProvider.java   | 46 ----------
 .../apache/storm/sql/DataSourcesRegistry.java   | 65 --------------
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  | 13 +--
 .../apache/storm/sql/compiler/CompilerUtil.java |  2 +-
 .../apache/storm/sql/compiler/ExprCompiler.java |  4 +-
 .../apache/storm/sql/compiler/PlanCompiler.java |  5 +-
 .../sql/compiler/PostOrderRelNodeVisitor.java   | 51 +++++------
 .../storm/sql/compiler/RelNodeCompiler.java     | 19 +---
 .../test/org/apache/storm/sql/TestStormSql.java | 13 +--
 .../storm/sql/compiler/TestCompilerUtils.java   | 46 ++++++++++
 .../storm/sql/compiler/TestExprCompiler.java    |  6 +-
 .../storm/sql/compiler/TestExprSemantic.java    |  3 +-
 .../storm/sql/compiler/TestPlanCompiler.java    |  5 +-
 .../storm/sql/compiler/TestRelNodeCompiler.java |  4 +-
 .../apache/storm/sql/compiler/TestUtils.java    | 92 --------------------
 external/sql/storm-sql-runtime/pom.xml          |  1 -
 .../storm/sql/runtime/DataSourcesProvider.java  | 50 +++++++++++
 .../storm/sql/runtime/DataSourcesRegistry.java  | 80 +++++++++++++++++
 .../org/apache/storm/sql/runtime/FieldInfo.java | 45 ++++++++++
 .../sql/runtime/ISqlTridentDataSource.java      | 29 ++++++
 .../test/org/apache/storm/sql/TestUtils.java    | 69 +++++++++++++++
 22 files changed, 382 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
index bcace6c..0713a61 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -49,6 +49,13 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
             <version>${calcite.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
deleted file mode 100644
index 46bfa40..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql;
-
-import org.apache.storm.sql.runtime.DataSource;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-public interface DataSourcesProvider {
-  /**
-   * @return the scheme of the data source
-   */
-  String scheme();
-
-  /**
-   * Construct a new data source.
-   * @param uri The URI that specifies the data source. The format of the URI
-   *            is fully customizable.
-   * @param inputFormatClass the name of the class that deserializes data.
-   *                         It is null when unspecified.
-   * @param outputFormatClass the name of the class that serializes data. It
-   *                          is null when unspecified.
-   * @param fields The name of the fields and the schema of the table.
-   */
-  DataSource construct(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      List<Map.Entry<String, Class<?>>> fields);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
deleted file mode 100644
index b45d039..0000000
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/DataSourcesRegistry.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.storm.sql.runtime.DataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-public class DataSourcesRegistry {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      DataSourcesRegistry.class);
-  private static final Map<String, DataSourcesProvider> providers;
-
-  static {
-    providers = new HashMap<>();
-    ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
-        DataSourcesProvider.class);
-    for (DataSourcesProvider p : loader) {
-      LOG.info("Registering scheme {} with {}", p.scheme(), p);
-      providers.put(p.scheme(), p);
-    }
-  }
-
-  private DataSourcesRegistry() {
-  }
-
-  public static DataSource construct(
-      URI uri, String inputFormatClass, String outputFormatClass,
-      List<Map.Entry<String, Class<?>>> fields) {
-    DataSourcesProvider provider = providers.get(uri.getScheme());
-    if (provider == null) {
-      return null;
-    }
-
-    return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
-  }
-
-  @VisibleForTesting
-  static Map<String, DataSourcesProvider> providerMap() {
-    return providers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index 384b4fa..d951243 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -29,12 +29,11 @@ import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
 import org.apache.storm.sql.compiler.PlanCompiler;
+import org.apache.storm.sql.parser.ColumnConstraint;
 import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.*;
 
 import java.util.AbstractMap;
 import java.util.ArrayList;
@@ -76,14 +75,16 @@ class StormSqlImpl extends StormSql {
   private void handleCreateTable(
       SqlCreateTable n, Map<String, DataSource> dataSources) {
     TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
-    List<Map.Entry<String, Class<?>>> fields = new ArrayList<>();
+    List<FieldInfo> fields = new ArrayList<>();
     for (ColumnDefinition col : n.fieldList()) {
       builder.field(col.name(), col.type());
       RelDataType dataType = col.type().deriveType(typeFactory);
       Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
-      fields.add(new AbstractMap.SimpleImmutableEntry<String, Class<?>>
-                     (col.name(), javaType));
+      ColumnConstraint constraint = col.constraint();
+      boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey;
+      fields.add(new FieldInfo(col.name(), javaType, isPrimary));
     }
+
     Table table = builder.build();
     schema.add(n.tableName(), table);
     DataSource ds = DataSourcesRegistry.construct(n.location(), n

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 1a48052..1ef1cb7 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -33,7 +33,7 @@ import org.apache.calcite.util.Util;
 import java.util.ArrayList;
 
 public class CompilerUtil {
-  static String escapeJavaString(String s, boolean nullMeansNull) {
+  public static String escapeJavaString(String s, boolean nullMeansNull) {
       if(s == null) {
         return nullMeansNull ? "null" : "\"\"";
       } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 77fdf0c..01024f0 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -48,13 +48,13 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.*;
 /**
  * Compile RexNode on top of the Tuple abstraction.
  */
-class ExprCompiler implements RexVisitor<String> {
+public class ExprCompiler implements RexVisitor<String> {
   private final PrintWriter pw;
   private final JavaTypeFactory typeFactory;
   private static final ImpTable IMP_TABLE = new ImpTable();
   private int nameCount;
 
-  ExprCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+  public ExprCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
     this.pw = pw;
     this.typeFactory = typeFactory;
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
index 1096f5b..d2d3710 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
@@ -96,9 +96,8 @@ public class PlanCompiler {
     RelNode n;
     while ((n = q.poll()) != null) {
       pw.print(
-          String.format("    ChannelContext CTX_%d = Channels.chain(%2$s, " +
-                            "%3$s);\n", n.getId(), lastCtx, RelNodeCompiler
-              .getStageName(n)));
+          String.format("    ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n",
+              n.getId(), lastCtx, RelNodeCompiler.getStageName(n)));
       lastCtx = String.format("CTX_%d", n.getId());
 
       if (n instanceof TableScan) {

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
index 6277e28..bb7c8d1 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
@@ -18,22 +18,11 @@
 package org.apache.storm.sql.compiler;
 
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Calc;
-import org.apache.calcite.rel.core.Collect;
-import org.apache.calcite.rel.core.Correlate;
-import org.apache.calcite.rel.core.Exchange;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.Sample;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.core.Uncollect;
-import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.core.*;
 import org.apache.calcite.rel.stream.Delta;
 
-abstract class PostOrderRelNodeVisitor<T> {
-  final T traverse(RelNode n) throws Exception {
+public abstract class PostOrderRelNodeVisitor<T> {
+  public final T traverse(RelNode n) throws Exception {
     for (RelNode input : n.getInputs()) {
       traverse(input);
     }
@@ -58,6 +47,8 @@ abstract class PostOrderRelNodeVisitor<T> {
       return visitSample((Sample) n);
     } else if (n instanceof Sort) {
       return visitSort((Sort) n);
+    } else if (n instanceof TableModify) {
+      return visitTableModify((TableModify) n);
     } else if (n instanceof TableScan) {
       return visitTableScan((TableScan) n);
     } else if (n instanceof Uncollect) {
@@ -69,59 +60,63 @@ abstract class PostOrderRelNodeVisitor<T> {
     }
   }
 
-  T visitAggregate(Aggregate aggregate) throws Exception {
+  public T visitAggregate(Aggregate aggregate) throws Exception {
     return defaultValue(aggregate);
   }
 
-  T visitCalc(Calc calc) throws Exception {
+  public T visitCalc(Calc calc) throws Exception {
     return defaultValue(calc);
   }
 
-  T visitCollect(Collect collect) throws Exception {
+  public T visitCollect(Collect collect) throws Exception {
     return defaultValue(collect);
   }
 
-  T visitCorrelate(Correlate correlate) throws Exception {
+  public T visitCorrelate(Correlate correlate) throws Exception {
     return defaultValue(correlate);
   }
 
-  T visitDelta(Delta delta) throws Exception {
+  public T visitDelta(Delta delta) throws Exception {
     return defaultValue(delta);
   }
 
-  T visitExchange(Exchange exchange) throws Exception {
+  public T visitExchange(Exchange exchange) throws Exception {
     return defaultValue(exchange);
   }
 
-  T visitProject(Project project) throws Exception {
+  public T visitProject(Project project) throws Exception {
     return defaultValue(project);
   }
 
-  T visitFilter(Filter filter) throws Exception {
+  public T visitFilter(Filter filter) throws Exception {
     return defaultValue(filter);
   }
 
-  T visitSample(Sample sample) throws Exception {
+  public T visitSample(Sample sample) throws Exception {
     return defaultValue(sample);
   }
 
-  T visitSort(Sort sort) throws Exception {
+  public T visitSort(Sort sort) throws Exception {
     return defaultValue(sort);
   }
 
-  T visitTableScan(TableScan scan) throws Exception {
+  public T visitTableModify(TableModify modify) throws Exception {
+    return defaultValue(modify);
+  }
+
+  public T visitTableScan(TableScan scan) throws Exception {
     return defaultValue(scan);
   }
 
-  T visitUncollect(Uncollect uncollect) throws Exception {
+  public T visitUncollect(Uncollect uncollect) throws Exception {
     return defaultValue(uncollect);
   }
 
-  T visitWindow(Window window) throws Exception {
+  public T visitWindow(Window window) throws Exception {
     return defaultValue(window);
   }
 
-  T defaultValue(RelNode n) {
+  public T defaultValue(RelNode n) {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
index 5a21fba2..eea451f 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
@@ -25,9 +25,6 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
 
 import java.io.PrintWriter;
-import java.util.Collections;
-import java.util.Set;
-import java.util.TreeSet;
 
 /**
  * Compile RelNodes into individual functions.
@@ -45,19 +42,13 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
     ""
   );
 
-  public Set<String> getReferredTables() {
-    return Collections.unmodifiableSet(referredTables);
-  }
-
-  private final Set<String> referredTables = new TreeSet<>();
-
   RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
     this.pw = pw;
     this.typeFactory = typeFactory;
   }
 
   @Override
-  Void visitFilter(Filter filter) throws Exception {
+  public Void visitFilter(Filter filter) throws Exception {
     beginStage(filter);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
     String r = filter.getCondition().accept(compiler);
@@ -67,7 +58,7 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  Void visitProject(Project project) throws Exception {
+  public Void visitProject(Project project) throws Exception {
     beginStage(project);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
 
@@ -84,14 +75,12 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   }
 
   @Override
-  Void defaultValue(RelNode n) {
+  public Void defaultValue(RelNode n) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  Void visitTableScan(TableScan scan) throws Exception {
-    String tableName = Joiner.on('_').join(scan.getTable().getQualifiedName());
-    referredTables.add(tableName);
+  public Void visitTableScan(TableScan scan) throws Exception {
     beginStage(scan);
     pw.print("    ctx.emit(_data);\n");
     endStage();

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index e18b9f8..9facd8a 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -18,9 +18,7 @@
 package org.apache.storm.sql;
 
 import backtype.storm.tuple.Values;
-import org.apache.storm.sql.compiler.TestUtils;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.*;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -29,7 +27,6 @@ import org.junit.Test;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 public class TestStormSql {
   private static class MockDataSourceProvider implements DataSourcesProvider {
@@ -41,9 +38,15 @@ public class TestStormSql {
     @Override
     public DataSource construct(
         URI uri, String inputFormatClass, String outputFormatClass,
-        List<Map.Entry<String, Class<?>>> fields) {
+        List<FieldInfo> fields) {
       return new TestUtils.MockDataSource();
     }
+
+    @Override
+    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+         List<FieldInfo> fields) {
+      throw new UnsupportedOperationException();
+    }
   }
 
   @BeforeClass

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
new file mode 100644
index 0000000..c582fdc
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -0,0 +1,46 @@
+package org.apache.storm.sql.compiler;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.*;
+
+public class TestCompilerUtils {
+  public static CalciteState sqlOverDummyTable(String sql)
+      throws RelConversionException, ValidationException, SqlParseException {
+    SchemaPlus schema = Frameworks.createRootSchema(true);
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+        (RelDataTypeSystem.DEFAULT);
+    Table table = new CompilerUtil.TableBuilderInfo(typeFactory)
+        .field("ID", SqlTypeName.INTEGER).build();
+    schema.add("FOO", table);
+    schema.add("BAR", table);
+    FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+        schema).build();
+    Planner planner = Frameworks.getPlanner(config);
+    SqlNode parse = planner.parse(sql);
+    SqlNode validate = planner.validate(parse);
+    RelNode tree = planner.convert(validate);
+    return new CalciteState(schema, tree);
+  }
+
+  public static class CalciteState {
+    final SchemaPlus schema;
+    final RelNode tree;
+
+    private CalciteState(SchemaPlus schema, RelNode tree) {
+      this.schema = schema;
+      this.tree = tree;
+    }
+
+    public SchemaPlus schema() { return schema; }
+    public RelNode tree() { return tree; }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
index a5f9d67..017aa25 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
@@ -37,7 +37,7 @@ public class TestExprCompiler {
   @Test
   public void testLiteral() throws Exception {
     String sql = "SELECT 1,1.0,TRUE,'FOO' FROM FOO";
-    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
     LogicalProject project = (LogicalProject) state.tree;
     String[] res = new String[project.getChildExps().size()];
@@ -55,7 +55,7 @@ public class TestExprCompiler {
   @Test
   public void testInputRef() throws Exception {
     String sql = "SELECT ID FROM FOO";
-    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
     LogicalProject project = (LogicalProject) state.tree;
     StringWriter sw = new StringWriter();
@@ -70,7 +70,7 @@ public class TestExprCompiler {
   @Test
   public void testCallExpr() throws Exception {
     String sql = "SELECT 1>2, 3+5, 1-1.0, 3+ID FROM FOO";
-    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
     LogicalProject project = (LogicalProject) state.tree;
     String[] res = new String[project.getChildExps().size()];

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 1d98664..40bb884 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
 import org.apache.storm.sql.runtime.ChannelHandler;
 import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.AbstractValuesProcessor;
@@ -124,7 +125,7 @@ public class TestExprSemantic {
   private Values testExpr(List<String> exprs) throws Exception {
     String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
         " WHERE ID > 0 AND ID < 2";
-    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
     AbstractValuesProcessor proc = compiler.compile(state.tree);
     Map<String, DataSource> data = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
index d32fdca..2b3a2d3 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
@@ -21,6 +21,7 @@ import backtype.storm.tuple.Values;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
 import org.apache.storm.sql.runtime.ChannelHandler;
 import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.AbstractValuesProcessor;
@@ -39,7 +40,7 @@ public class TestPlanCompiler {
   @Test
   public void testCompile() throws Exception {
     String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
-    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
     AbstractValuesProcessor proc = compiler.compile(state.tree);
     Map<String, DataSource> data = new HashMap<>();
@@ -54,7 +55,7 @@ public class TestPlanCompiler {
   @Test
   public void testLogicalExpr() throws Exception {
     String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
-    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
     AbstractValuesProcessor proc = compiler.compile(state.tree);
     Map<String, DataSource> data = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
index 623a2f4..99083cb 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -19,11 +19,9 @@ package org.apache.storm.sql.compiler;
 
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,7 +34,7 @@ public class TestRelNodeCompiler {
   @Test
   public void testFilter() throws Exception {
     String sql = "SELECT ID + 1 FROM FOO WHERE ID > 3";
-    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
         RelDataTypeSystem.DEFAULT);
     LogicalProject project = (LogicalProject) state.tree;

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
deleted file mode 100644
index 5aa4cb0..0000000
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.storm.sql.compiler;
-
-import backtype.storm.tuple.Values;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.apache.storm.sql.runtime.ChannelContext;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestUtils {
-  static CalciteState sqlOverDummyTable(String sql)
-      throws RelConversionException, ValidationException, SqlParseException {
-    SchemaPlus schema = Frameworks.createRootSchema(true);
-    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
-        (RelDataTypeSystem.DEFAULT);
-    Table table = new CompilerUtil.TableBuilderInfo(typeFactory)
-        .field("ID", SqlTypeName.INTEGER).build();
-    schema.add("FOO", table);
-    FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
-        schema).build();
-    Planner planner = Frameworks.getPlanner(config);
-    SqlNode parse = planner.parse(sql);
-    SqlNode validate = planner.validate(parse);
-    RelNode tree = planner.convert(validate);
-    return new CalciteState(schema, tree);
-  }
-
-  static class CalciteState {
-    final SchemaPlus schema;
-    final RelNode tree;
-
-    private CalciteState(SchemaPlus schema, RelNode tree) {
-      this.schema = schema;
-      this.tree = tree;
-    }
-  }
-
-  public static class MockDataSource implements DataSource {
-    private final ArrayList<Values> RECORDS = new ArrayList<>();
-
-    public MockDataSource() {
-      for (int i = 0; i < 5; ++i) {
-        RECORDS.add(new Values(i));
-      }
-    }
-
-    @Override
-    public void open(ChannelContext ctx) {
-      for (Values v : RECORDS) {
-        ctx.emit(v);
-      }
-      ctx.fireChannelInactive();
-    }
-  }
-
-  public static class CollectDataChannelHandler implements ChannelHandler {
-    private final List<Values> values;
-
-    public CollectDataChannelHandler(List<Values> values) {
-      this.values = values;
-    }
-
-    @Override
-    public void dataReceived(ChannelContext ctx, Values data) {
-      values.add(data);
-    }
-
-    @Override
-    public void channelInactive(ChannelContext ctx) {}
-
-    @Override
-    public void exceptionCaught(Throwable cause) {
-      throw new RuntimeException(cause);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
index 62f2d95..a8ba4dc 100644
--- a/external/sql/storm-sql-runtime/pom.xml
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -45,7 +45,6 @@
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
new file mode 100644
index 0000000..eaabc8d
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -0,0 +1,50 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public interface DataSourcesProvider {
+  /**
+   * @return the scheme of the data source
+   */
+  String scheme();
+
+  /**
+   * Construct a new data source.
+   * @param uri The URI that specifies the data source. The format of the URI
+   *            is fully customizable.
+   * @param inputFormatClass the name of the class that deserializes data.
+   *                         It is null when unspecified.
+   * @param outputFormatClass the name of the class that serializes data. It
+   *                          is null when unspecified.
+   * @param fields The name of the fields and the schema of the table.
+   */
+  DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields);
+
+  ISqlTridentDataSource constructTrident(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
new file mode 100644
index 0000000..0285c97
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -0,0 +1,80 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+public class DataSourcesRegistry {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DataSourcesRegistry.class);
+  private static final Map<String, DataSourcesProvider> providers;
+
+  static {
+    providers = new HashMap<>();
+    ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+        DataSourcesProvider.class);
+    for (DataSourcesProvider p : loader) {
+      LOG.info("Registering scheme {} with {}", p.scheme(), p);
+      providers.put(p.scheme(), p);
+    }
+  }
+
+  private DataSourcesRegistry() {
+  }
+
+  public static DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields) {
+    DataSourcesProvider provider = providers.get(uri.getScheme());
+    if (provider == null) {
+      return null;
+    }
+
+    return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
+  }
+
+  public static ISqlTridentDataSource constructTridentDataSource(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields) {
+    DataSourcesProvider provider = providers.get(uri.getScheme());
+    if (provider == null) {
+      return null;
+    }
+
+    return provider.constructTrident(uri, inputFormatClass, outputFormatClass, fields);
+  }
+
+  /**
+   * Allow unit tests to inject data sources.
+   */
+  @VisibleForTesting
+  public static Map<String, DataSourcesProvider> providerMap() {
+    return providers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
new file mode 100644
index 0000000..cb1176b
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+/**
+ * Describe each column of the field
+ */
+public class FieldInfo {
+  private final String name;
+  private final Class<?> type;
+  private final boolean isPrimary;
+
+  public FieldInfo(String name, Class<?> type, boolean isPrimary) {
+    this.name = name;
+    this.type = type;
+    this.isPrimary = isPrimary;
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public Class<?> type() {
+    return type;
+  }
+
+  public boolean isPrimary() {
+    return isPrimary;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
new file mode 100644
index 0000000..4b2a915
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import storm.trident.operation.Function;
+import storm.trident.spout.IBatchSpout;
+
+/**
+ * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
+ */
+public interface ISqlTridentDataSource {
+  IBatchSpout getProducer();
+  Function getConsumer();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/82f10ebf/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
new file mode 100644
index 0000000..82347ef
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -0,0 +1,69 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.runtime.ChannelContext;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestUtils {
+  public static class MockDataSource implements DataSource {
+    private final ArrayList<Values> RECORDS = new ArrayList<>();
+
+    public MockDataSource() {
+      for (int i = 0; i < 5; ++i) {
+        RECORDS.add(new Values(i));
+      }
+    }
+
+    @Override
+    public void open(ChannelContext ctx) {
+      for (Values v : RECORDS) {
+        ctx.emit(v);
+      }
+      ctx.fireChannelInactive();
+    }
+  }
+
+  public static class CollectDataChannelHandler implements ChannelHandler {
+    private final List<Values> values;
+
+    public CollectDataChannelHandler(List<Values> values) {
+      this.values = values;
+    }
+
+    @Override
+    public void dataReceived(ChannelContext ctx, Values data) {
+      values.add(data);
+    }
+
+    @Override
+    public void channelInactive(ChannelContext ctx) {}
+
+    @Override
+    public void exceptionCaught(Throwable cause) {
+      throw new RuntimeException(cause);
+    }
+  }
+}