You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:09:56 UTC

[10/20] storm git commit: [StormSQL] Compile logical plans to Java code.

[StormSQL] Compile logical plans to Java code.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a257f12
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a257f12
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a257f12

Branch: refs/heads/master
Commit: 0a257f12527a4aec6c477147ff8390ac08c92136
Parents: bd4f6dc
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Oct 22 16:52:59 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:02 2015 -0800

----------------------------------------------------------------------
 external/sql/pom.xml                            |   1 +
 external/sql/storm-sql-core/pom.xml             |  11 +
 .../apache/storm/sql/compiler/CompilerUtil.java |  95 +++++++-
 .../apache/storm/sql/compiler/PlanCompiler.java | 131 +++++++++++
 .../storm/sql/compiler/RelNodeCompiler.java     |  45 ++--
 .../storm/sql/javac/CompilingClassLoader.java   | 221 +++++++++++++++++++
 .../storm/sql/compiler/TestPlanCompiler.java    |  53 +++++
 .../storm/sql/compiler/TestRelNodeCompiler.java |  10 -
 .../apache/storm/sql/compiler/TestUtils.java    | 125 ++++-------
 external/sql/storm-sql-runtime/pom.xml          |  65 ++++++
 .../storm/sql/storm/AbstractChannelHandler.java |  35 +++
 .../apache/storm/sql/storm/ChannelContext.java  |  28 +++
 .../apache/storm/sql/storm/ChannelHandler.java  |  37 ++++
 .../org/apache/storm/sql/storm/Channels.java    |  78 +++++++
 .../org/apache/storm/sql/storm/DataSource.java  |  27 +++
 .../storm/runtime/AbstractValuesProcessor.java  |  29 +++
 16 files changed, 881 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 73e7b31..e4dae94 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -38,5 +38,6 @@
 
     <modules>
         <module>storm-sql-core</module>
+        <module>storm-sql-runtime</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
index 3ca1ced..bcace6c 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -43,6 +43,12 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
             <version>${calcite.version}</version>
@@ -52,6 +58,11 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/jvm</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 5e7453a..1a48052 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -17,9 +17,22 @@
  */
 package org.apache.storm.sql.compiler;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 
-class CompilerUtil {
+import java.util.ArrayList;
+
+public class CompilerUtil {
   static String escapeJavaString(String s, boolean nullMeansNull) {
       if(s == null) {
         return nullMeansNull ? "null" : "\"\"";
@@ -32,4 +45,84 @@ class CompilerUtil {
         return "\"" + s5 + "\"";
       }
   }
+
+  public static class TableBuilderInfo {
+    private final RelDataTypeFactory typeFactory;
+
+    public TableBuilderInfo(RelDataTypeFactory typeFactory) {
+      this.typeFactory = typeFactory;
+    }
+
+    private static class FieldType {
+      private final String name;
+      private final RelDataType relDataType;
+
+      private FieldType(String name, RelDataType relDataType) {
+        this.name = name;
+        this.relDataType = relDataType;
+      }
+
+    }
+
+    private final ArrayList<FieldType> fields = new ArrayList<>();
+    private final ArrayList<Object[]> rows = new ArrayList<>();
+    private Statistic stats;
+
+    public TableBuilderInfo field(String name, SqlTypeName type) {
+      RelDataType dataType = typeFactory.createSqlType(type);
+      fields.add(new FieldType(name, dataType));
+      return this;
+    }
+
+    public TableBuilderInfo field(String name, SqlTypeName type, int
+        precision) {
+      RelDataType dataType = typeFactory.createSqlType(type, precision);
+      fields.add(new FieldType(name, dataType));
+      return this;
+    }
+
+    public TableBuilderInfo field(
+        String name, SqlDataTypeSpec type) {
+      RelDataType dataType = type.deriveType(typeFactory);
+      fields.add(new FieldType(name, dataType));
+      return this;
+    }
+
+    public TableBuilderInfo statistics(Statistic stats) {
+      this.stats = stats;
+      return this;
+    }
+
+    @VisibleForTesting
+    public TableBuilderInfo rows(Object[] data) {
+      rows.add(data);
+      return this;
+    }
+
+    public Table build() {
+      final Statistic stat = stats;
+      return new Table() {
+        @Override
+        public RelDataType getRowType(
+            RelDataTypeFactory relDataTypeFactory) {
+          RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
+          for (FieldType f : fields) {
+            b.add(f.name, f.relDataType);
+          }
+          return b.build();
+        }
+
+        @Override
+        public Statistic getStatistic() {
+          return stat != null ? stat : Statistics.of(rows.size(),
+                                                     ImmutableList.<ImmutableBitSet>of());
+        }
+
+        @Override
+        public Schema.TableType getJdbcTableType() {
+          return Schema.TableType.TABLE;
+        }
+      };
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
new file mode 100644
index 0000000..d006261
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+
+public class PlanCompiler {
+  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+  private static final String PROLOGUE = NEW_LINE_JOINER.join(
+      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+      "import java.util.Iterator;", "import java.util.Map;",
+      "import backtype.storm.tuple.Values;",
+      "import org.apache.storm.sql.storm.AbstractChannelHandler;",
+      "import org.apache.storm.sql.storm.Channels;",
+      "import org.apache.storm.sql.storm.ChannelContext;",
+      "import org.apache.storm.sql.storm.ChannelHandler;",
+      "import org.apache.storm.sql.storm.DataSource;",
+      "import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;",
+      "public final class Processor extends AbstractValuesProcessor {", "");
+  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+      "  @Override",
+      "  public void initialize(Map<String, DataSource> data,",
+      "                         ChannelHandler result) {",
+      "    ChannelContext r = Channels.chain(Channels.voidContext(), result);",
+      ""
+  );
+
+  private final JavaTypeFactory typeFactory;
+
+  public PlanCompiler(JavaTypeFactory typeFactory) {
+    this.typeFactory = typeFactory;
+  }
+
+  private String generateJavaSource(RelNode root) throws Exception {
+    StringWriter sw = new StringWriter();
+    try (PrintWriter pw = new PrintWriter(sw)) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      printPrologue(pw);
+      compiler.traverse(root);
+      printMain(pw, root);
+      printEpilogue(pw);
+    }
+    return sw.toString();
+  }
+
+  private void printMain(PrintWriter pw, RelNode root) {
+    Set<TableScan> tables = new HashSet<>();
+    pw.print(INITIALIZER_PROLOGUE);
+    chainOperators(pw, root, tables);
+    for (TableScan n : tables) {
+      String escaped = CompilerUtil.escapeJavaString(
+          Joiner.on('.').join(n.getTable().getQualifiedName()), true);
+      String r = NEW_LINE_JOINER.join(
+          "    if (!data.containsKey(%1$s))",
+          "      throw new RuntimeException(\"Cannot find table \" + %1$s);",
+          "  data.get(%1$s).open(CTX_%2$d);",
+          "");
+      pw.print(String.format(r, escaped, n.getId()));
+    }
+    pw.print("  }\n");
+  }
+
+  private void chainOperators(
+      PrintWriter pw, RelNode root, Set<TableScan> tables) {
+    String lastCtx = "r";
+    Queue<RelNode> q = new ArrayDeque<>();
+    q.add(root);
+    RelNode n;
+    while ((n = q.poll()) != null) {
+      pw.print(
+          String.format("    ChannelContext CTX_%d = Channels.chain(%2$s, " +
+                            "%3$s);\n", n.getId(), lastCtx, RelNodeCompiler
+              .getStageName(n)));
+      lastCtx = String.format("CTX_%d", n.getId());
+
+      if (n instanceof TableScan) {
+        tables.add((TableScan)n);
+      }
+
+      for (RelNode i : n.getInputs()) {
+        q.add(i);
+      }
+    }
+  }
+
+  public AbstractValuesProcessor compile(RelNode plan) throws Exception {
+    String javaCode = generateJavaSource(plan);
+    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+                                              PACKAGE_NAME + ".Processor",
+                                              javaCode, null);
+    return (AbstractValuesProcessor) cl.loadClass(
+        PACKAGE_NAME + ".Processor").newInstance();
+  }
+
+  private static void printEpilogue(
+      PrintWriter pw) throws Exception {
+    pw.print("}\n");
+  }
+
+  private static void printPrologue(PrintWriter pw) {
+    pw.append(PROLOGUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
index 0550035..5a21fba2 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
@@ -33,8 +33,17 @@ import java.util.TreeSet;
  * Compile RelNodes into individual functions.
  */
 class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
   private final PrintWriter pw;
   private final JavaTypeFactory typeFactory;
+  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+    "  private static final ChannelHandler %1$s = ",
+    "    new AbstractChannelHandler() {",
+    "    @Override",
+    "    public void dataReceived(ChannelContext ctx, Values _data) {",
+    ""
+  );
 
   public Set<String> getReferredTables() {
     return Collections.unmodifiableSet(referredTables);
@@ -49,19 +58,17 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
 
   @Override
   Void visitFilter(Filter filter) throws Exception {
-    beginFunction(filter);
-    pw.print("  if (_data == null) return null;\n");
+    beginStage(filter);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
     String r = filter.getCondition().accept(compiler);
-    pw.print(String.format("  return %s ? _data : null;\n", r));
-    endFunction();
+    pw.print(String.format("    if (%s) { ctx.emit(_data); }\n", r));
+    endStage();
     return null;
   }
 
   @Override
   Void visitProject(Project project) throws Exception {
-    beginFunction(project);
-    pw.print("  if (_data == null) return null;\n");
+    beginStage(project);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
 
     int size = project.getChildExps().size();
@@ -70,9 +77,9 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
       res[i] = project.getChildExps().get(i).accept(compiler);
     }
 
-    pw.print(String.format("  return new Values(%s);\n", Joiner.on(',').join
-        (res)));
-    endFunction();
+    pw.print(String.format("    ctx.emit(new Values(%s));\n",
+                           Joiner.on(',').join(res)));
+    endStage();
     return null;
   }
 
@@ -85,23 +92,21 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
   Void visitTableScan(TableScan scan) throws Exception {
     String tableName = Joiner.on('_').join(scan.getTable().getQualifiedName());
     referredTables.add(tableName);
-    beginFunction(scan);
-    pw.print(String.format("  return _datasources[TABLE_%s].next();\n",
-                           tableName));
-    endFunction();
+    beginStage(scan);
+    pw.print("    ctx.emit(_data);\n");
+    endStage();
     return null;
   }
 
-  private void beginFunction(RelNode n) {
-    pw.print(String.format("private Values %s(%s) {\n", getFunctionName(n), n
-        .getInputs().isEmpty() ? "" : "Values _data"));
+  private void beginStage(RelNode n) {
+    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
   }
 
-  private void endFunction() {
-    pw.print("}\n");
+  private void endStage() {
+    pw.print("  }\n  };\n");
   }
 
-  static String getFunctionName(RelNode n) {
-    return n.getClass().getSimpleName() + "_" + n.getId();
+  static String getStageName(RelNode n) {
+    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
new file mode 100644
index 0000000..cf76964
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2010 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.javac;
+
+
+import javax.tools.DiagnosticListener;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.singleton;
+
+/**
+ * This is a Java ClassLoader that will attempt to load a class from a string of source code.
+ *
+ * <h3>Example</h3>
+ *
+ * <pre>
+ * String className = "com.foo.MyClass";
+ * String classSource =
+ *   "package com.foo;\n" +
+ *   "public class MyClass implements Runnable {\n" +
+ *   "  @Override public void run() {\n" +
+ *   "   log(\"Hello world\");\n" +
+ *   "  }\n" +
+ *   "}";
+ *
+ * // Load class from source.
+ * ClassLoader classLoader = new CompilingClassLoader(
+ *     parentClassLoader, className, classSource);
+ * Class myClass = classLoader.loadClass(className);
+ *
+ * // Use it.
+ * Runnable instance = (Runnable)myClass.newInstance();
+ * instance.run();
+ * </pre>
+ *
+ * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
+ * compile more, create multiple CompilingClassLoader instances.
+ *
+ * Uses Java 1.6's in built compiler API.
+ *
+ * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
+ * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
+ * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
+ * {#setDiagnosticListener()}.
+ *
+ * @see java.lang.ClassLoader
+ * @see javax.tools.JavaCompiler
+ */
+public class CompilingClassLoader extends ClassLoader {
+
+  /**
+   * Thrown when code cannot be compiled.
+   */
+  public static class CompilerException extends Exception {
+    private static final long serialVersionUID = -2936958840023603270L;
+
+    public CompilerException(String message) {
+      super(message);
+    }
+  }
+
+  private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
+
+  private static final URI EMPTY_URI;
+
+  static {
+    try {
+      // Needed to keep SimpleFileObject constructor happy.
+      EMPTY_URI = new URI("");
+    } catch (URISyntaxException e) {
+      throw new Error(e);
+    }
+  }
+
+  /**
+   * @param parent Parent classloader to resolve dependencies from.
+   * @param className Name of class to compile. eg. "com.foo.MyClass".
+   * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
+   * @param diagnosticListener Notified of compiler errors (may be null).
+   */
+  public CompilingClassLoader(
+      ClassLoader parent,
+      String className,
+      String sourceCode,
+      DiagnosticListener<JavaFileObject> diagnosticListener)
+      throws CompilerException {
+    super(parent);
+    if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
+      throw new CompilerException("Could not compile " + className);
+    }
+  }
+
+  /**
+   * Override ClassLoader's class resolving method. Don't call this directly, instead use
+   * {@link ClassLoader#loadClass(String)}.
+   */
+  @Override
+  public Class<?> findClass(String name) throws ClassNotFoundException {
+    ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
+    if (byteCode == null) {
+      throw new ClassNotFoundException(name);
+    }
+    return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
+  }
+
+  /**
+   * @return Whether compilation was successful.
+   */
+  private boolean compileSourceCodeToByteCode(
+      String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
+    JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
+
+    // Set up the in-memory filesystem.
+    InMemoryFileManager fileManager =
+        new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
+    JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
+
+    // Javac option: remove these when the javac zip impl is fixed
+    // (http://b/issue?id=1822932)
+    System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
+    List<String> options = new LinkedList<>();
+    // this is ignored by javac currently but useJavaUtilZip should be
+    // a valid javac XD option, which is another bug
+    options.add("-XDuseJavaUtilZip");
+
+    // Now compile!
+    JavaCompiler.CompilationTask compilationTask =
+        javaCompiler.getTask(
+            null, // Null: log any unhandled errors to stderr.
+            fileManager,
+            diagnosticListener,
+            options,
+            null,
+            singleton(javaFile));
+    return compilationTask.call();
+  }
+
+  /**
+   * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
+   * any files to disk.
+   *
+   * When files are written to, rather than putting the bytes on disk, they are appended to buffers
+   * in byteCodeForClasses.
+   *
+   * @see javax.tools.JavaFileManager
+   */
+  private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
+    public InMemoryFileManager(JavaFileManager fileManager) {
+      super(fileManager);
+    }
+
+    @Override
+    public JavaFileObject getJavaFileForOutput(
+        Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
+        throws IOException {
+      return new SimpleJavaFileObject(EMPTY_URI, kind) {
+        @Override
+        public OutputStream openOutputStream() throws IOException {
+          ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
+          if (outputStream != null) {
+            throw new IllegalStateException("Cannot write more than once");
+          }
+          // Reasonable size for a simple .class.
+          outputStream = new ByteArrayOutputStream(256);
+          byteCodeForClasses.put(className, outputStream);
+          return outputStream;
+        }
+      };
+    }
+  }
+
+  private static class InMemoryJavaFile extends SimpleJavaFileObject {
+    private final String sourceCode;
+
+    public InMemoryJavaFile(String className, String sourceCode) {
+      super(makeUri(className), Kind.SOURCE);
+      this.sourceCode = sourceCode;
+    }
+
+    private static URI makeUri(String className) {
+      try {
+        return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e); // Not sure what could cause this.
+      }
+    }
+
+    @Override
+    public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
+      return sourceCode;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
new file mode 100644
index 0000000..8d22a53
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import backtype.storm.tuple.Values;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanCompiler {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  @Test
+  public void testCompile() throws Exception {
+    String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
+    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree);
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    Assert.assertArrayEquals(new Values[] { new Values(4), new Values(5)},
+                             values.toArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
index 61f5409..cedb48b 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -38,16 +38,6 @@ public class TestRelNodeCompiler {
         RelDataTypeSystem.DEFAULT);
     LogicalProject project = (LogicalProject) state.tree;
     LogicalFilter filter = (LogicalFilter) project.getInput();
-    TableScan scan = (TableScan) filter.getInput();
-
-    try (StringWriter sw = new StringWriter();
-         PrintWriter pw = new PrintWriter(sw)
-    ) {
-      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
-      compiler.visitTableScan(scan);
-      pw.flush();
-      Assert.assertTrue(sw.toString().contains("_datasources[TABLE_FOO]"));
-    }
 
     try (StringWriter sw = new StringWriter();
          PrintWriter pw = new PrintWriter(sw)

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
index ae4300a..6731c90 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
@@ -1,13 +1,11 @@
 package org.apache.storm.sql.compiler;
 
-import com.google.common.collect.ImmutableList;
+import backtype.storm.tuple.Values;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.Schema;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
@@ -17,15 +15,21 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.storm.sql.storm.ChannelContext;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
 
 import java.util.ArrayList;
+import java.util.List;
 
-class TestUtils {
+public class TestUtils {
   static CalciteState sqlOverDummyTable(String sql)
       throws RelConversionException, ValidationException, SqlParseException {
     SchemaPlus schema = Frameworks.createRootSchema(true);
-    Table table = newTable().field("ID", SqlTypeName.INTEGER).build();
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+        (RelDataTypeSystem.DEFAULT);
+    Table table = new CompilerUtil.TableBuilderInfo(typeFactory)
+        .field("ID", SqlTypeName.INTEGER).build();
     schema.add("FOO", table);
     FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
         schema).build();
@@ -36,90 +40,53 @@ class TestUtils {
     return new CalciteState(schema, tree);
   }
 
-  static class TableBuilderInfo {
-    private static class FieldType {
-      private static final int NO_PRECISION = -1;
-      private final String name;
-      private final SqlTypeName type;
-      private final int precision;
-
-      private FieldType(String name, SqlTypeName type, int precision) {
-        this.name = name;
-        this.type = type;
-        this.precision = precision;
-      }
+  static class CalciteState {
+    final SchemaPlus schema;
+    final RelNode tree;
 
-      private FieldType(String name, SqlTypeName type) {
-        this(name, type, NO_PRECISION);
-      }
+    private CalciteState(SchemaPlus schema, RelNode tree) {
+      this.schema = schema;
+      this.tree = tree;
     }
+  }
 
-    private final ArrayList<FieldType> fields = new ArrayList<>();
-    private final ArrayList<Object[]> rows = new ArrayList<>();
-    private Statistic stats;
+  public static class MockDataSource implements DataSource {
+    private final ArrayList<Values> RECORDS = new ArrayList<>();
 
-    TableBuilderInfo field(String name, SqlTypeName type) {
-      fields.add(new FieldType(name, type));
-      return this;
+    public MockDataSource() {
+      for (int i = 0; i < 5; ++i) {
+        RECORDS.add(new Values(i));
+      }
     }
 
-    TableBuilderInfo field(String name, SqlTypeName type, int precision) {
-      fields.add(new FieldType(name, type, precision));
-      return this;
+    @Override
+    public void open(ChannelContext ctx) {
+      for (Values v : RECORDS) {
+        ctx.emit(v);
+      }
+      ctx.fireChannelInactive();
     }
+  }
 
-    TableBuilderInfo statistics(Statistic stats) {
-      this.stats = stats;
-      return this;
-    }
+  public static class CollectDataChannelHandler implements ChannelHandler {
+    private final List<Values> values;
 
-    TableBuilderInfo rows(Object[] data) {
-      rows.add(data);
-      return this;
+    public CollectDataChannelHandler(List<Values> values) {
+      this.values = values;
     }
 
-    Table build() {
-      final Statistic stat = stats;
-      return new Table() {
-        @Override
-        public RelDataType getRowType(
-            RelDataTypeFactory relDataTypeFactory) {
-          RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
-          for (FieldType f : fields) {
-            if (f.precision == FieldType.NO_PRECISION) {
-              b.add(f.name, f.type);
-            } else {
-              b.add(f.name, f.type, f.precision);
-            }
-          }
-          return b.build();
-        }
-
-        @Override
-        public Statistic getStatistic() {
-          return stat != null ? stat : Statistics.of(rows.size(),
-                                                     ImmutableList.<ImmutableBitSet>of());
-        }
-
-        @Override
-        public Schema.TableType getJdbcTableType() {
-          return Schema.TableType.TABLE;
-        }
-      };
+    @Override
+    public void dataReceived(ChannelContext ctx, Values data) {
+      values.add(data);
     }
-  }
 
-  static TableBuilderInfo newTable() {
-    return new TableBuilderInfo();
-  }
+    @Override
+    public void channelInactive(ChannelContext ctx) {}
 
-  static class CalciteState {
-    final SchemaPlus schema;
-    final RelNode tree;
-
-    private CalciteState(SchemaPlus schema, RelNode tree) {
-      this.schema = schema;
-      this.tree = tree;
+    @Override
+    public void exceptionCaught(Throwable cause) {
+      throw new RuntimeException(cause);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
new file mode 100644
index 0000000..62f2d95
--- /dev/null
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-runtime</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+      <sourceDirectory>src/jvm</sourceDirectory>
+      <testSourceDirectory>src/test</testSourceDirectory>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
new file mode 100644
index 0000000..cf110e3
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+public abstract class AbstractChannelHandler implements ChannelHandler {
+  @Override
+  public abstract void dataReceived(ChannelContext ctx, Values data);
+
+  @Override
+  public void channelInactive(ChannelContext ctx) {
+
+  }
+
+  @Override
+  public void exceptionCaught(Throwable cause) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
new file mode 100644
index 0000000..a2806b2
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+public interface ChannelContext {
+  /**
+   * Emit data to the next stage of the data pipeline.
+   */
+  void emit(Values data);
+  void fireChannelInactive();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
new file mode 100644
index 0000000..8cd3a28
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+/**
+ * DataListener provides an event-driven interface for the user to process
+ * series of events.
+ */
+public interface ChannelHandler {
+  void dataReceived(ChannelContext ctx, Values data);
+
+  /**
+   * The producer of the data has indicated that the channel is no longer
+   * active.
+   * @param ctx
+   */
+  void channelInactive(ChannelContext ctx);
+
+  void exceptionCaught(Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
new file mode 100644
index 0000000..b5bb619
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+public class Channels {
+  private static final ChannelContext VOID_CTX = new ChannelContext() {
+    @Override
+    public void emit(Values data) {}
+
+    @Override
+    public void fireChannelInactive() {}
+  };
+
+  private static class ChannelContextAdapter implements ChannelContext {
+    private final ChannelHandler handler;
+    private final ChannelContext next;
+
+    public ChannelContextAdapter(
+        ChannelContext next, ChannelHandler handler) {
+      this.handler = handler;
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      handler.dataReceived(next, data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      handler.channelInactive(next);
+    }
+  }
+
+  private static class ForwardingChannelContext implements ChannelContext {
+    private final ChannelContext next;
+
+    public ForwardingChannelContext(ChannelContext next) {
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      next.emit(data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      next.fireChannelInactive();
+    }
+  }
+
+  public static ChannelContext chain(
+      ChannelContext next, ChannelHandler handler) {
+    return new ChannelContextAdapter(next, handler);
+  }
+
+  public static ChannelContext voidContext() {
+    return VOID_CTX;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
new file mode 100644
index 0000000..84fa6e0
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+/**
+ * A DataSource ingests data in StormSQL. It provides a series of tuple to
+ * the downstream {@link ChannelHandler}.
+ *
+ */
+public interface DataSource {
+  void open(ChannelContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..bd068be
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,29 @@
+package org.apache.storm.sql.storm.runtime;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+  /**
+   * Initialize the data sources.
+   *
+   * @param data a map from the table name to the iterators of the values.
+   *
+   */
+  public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+      result);
+}