You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/27 18:08:13 UTC

[9/9] storm git commit: Compile logical plans to Java code.

Compile logical plans to Java code.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2955a789
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2955a789
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2955a789

Branch: refs/heads/STORM-1040
Commit: 2955a78937cfafd7db40276a4893165db9c38f3d
Parents: 54723fa
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Oct 22 16:52:59 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Oct 26 10:04:16 2015 -0700

----------------------------------------------------------------------
 external/sql/pom.xml                            |   1 +
 external/sql/storm-sql-core/pom.xml             |   6 +
 .../apache/storm/sql/compiler/PlanCompiler.java | 136 ++++++++++++
 .../storm/sql/javac/CompilingClassLoader.java   | 221 +++++++++++++++++++
 .../storm/sql/compiler/TestPlanCompiler.java    |  62 ++++++
 external/sql/storm-sql-runtime/pom.xml          |  65 ++++++
 .../apache/storm/sql/storm/ValueIterator.java   |  55 +++++
 .../storm/runtime/AbstractValuesProcessor.java  |  47 ++++
 .../storm/sql/storm/TestValueIterator.java      | 107 +++++++++
 9 files changed, 700 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 73e7b31..e4dae94 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -38,5 +38,6 @@
 
     <modules>
         <module>storm-sql-core</module>
+        <module>storm-sql-runtime</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
index 562f12f..0977c45 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -43,6 +43,12 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
             <version>${calcite.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
new file mode 100644
index 0000000..5d910e7
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PlanCompiler {
+  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+  private static final String PROLOGUE = NEW_LINE_JOINER.join(
+      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+      "import java.util.Iterator;", "import java.util.Map;",
+      "import backtype.storm.tuple.Values;",
+      "import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;", "",
+      "public final class Processor extends AbstractValuesProcessor {", "",
+      "@Override",
+      "protected Iterator<Values>[] getDataSource() { return _datasources; }","");
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  private String generateJavaSource(RelNode root) throws Exception {
+    StringWriter sw = new StringWriter();
+    try (PrintWriter pw = new PrintWriter(sw)) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      printPrologue(pw);
+      compiler.traverse(root);
+      printTableReference(pw, compiler.getReferredTables());
+      printEpilogue(pw, root);
+    }
+    return sw.toString();
+  }
+
+  private void printTableReference(PrintWriter pw, Set<String> referredTables) {
+    Map<String, Integer> ids = new HashMap<>();
+    for (String s : referredTables) {
+      pw.print(String.format("  private static final int TABLE_%s = %d;\n", s,
+                             ids.size()));
+      ids.put(s, ids.size());
+    }
+
+    pw.print("  @SuppressWarnings(\"unchecked\")\n");
+    pw.print(String.format(
+        "  private final Iterator<Values>[] _datasources = new Iterator[%d];\n",
+        referredTables.size()));
+    pw.print(
+        "  @Override public void initialize(Map<String, Iterator<Values>> " +
+            "data) {\n");
+    for (String s : referredTables) {
+      String escaped = CompilerUtil.escapeJavaString(s, true);
+      String r = NEW_LINE_JOINER.join("  if (!data.containsKey(%1$s))",
+                                      "    throw new RuntimeException(\"Cannot find table \" + %1$s);",
+                                      "  _datasources[%2$d] = data.get(%1$s);");
+      pw.print(String.format(r, escaped, ids.get(s)));
+    }
+    pw.print("}\n");
+  }
+
+  public AbstractValuesProcessor compile(RelNode plan) throws Exception {
+    String javaCode = generateJavaSource(plan);
+    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+                                              PACKAGE_NAME + ".Processor",
+                                              javaCode, null);
+    AbstractValuesProcessor processor = (AbstractValuesProcessor) cl.loadClass(
+        PACKAGE_NAME + ".Processor").newInstance();
+    return processor;
+  }
+
+  private static void printEpilogue(
+      PrintWriter pw, RelNode root) throws Exception {
+    pw.print("  @Override public Values next() {\n");
+    MainBodyCompiler compiler = new MainBodyCompiler(pw);
+    compiler.traverse(root);
+    pw.print(String.format("  return t%d;\n", root.getId()));
+    pw.print("  }\n");
+    pw.print("}\n");
+  }
+
+  private static void printPrologue(PrintWriter pw) {
+    pw.append(PROLOGUE);
+  }
+
+  private static class MainBodyCompiler extends PostOrderRelNodeVisitor<Void> {
+    private final PrintWriter pw;
+
+    private MainBodyCompiler(PrintWriter pw) {
+      this.pw = pw;
+    }
+
+    @Override
+    Void defaultValue(RelNode n) {
+      String params;
+      if (n instanceof TableScan) {
+        params = "";
+      } else {
+        ArrayList<String> inputIds = new ArrayList<>();
+        for (RelNode i : n.getInputs()) {
+          inputIds.add("t" + i.getId());
+        }
+        params = Joiner.on(",").join(inputIds);
+      }
+      String l = String.format("  Values t%d = %s(%s);\n", n.getId(),
+                               RelNodeCompiler.getFunctionName(n), params);
+      pw.print(l);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
new file mode 100644
index 0000000..cf76964
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2010 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.javac;
+
+
+import javax.tools.DiagnosticListener;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.singleton;
+
+/**
+ * This is a Java ClassLoader that will attempt to load a class from a string of source code.
+ *
+ * <h3>Example</h3>
+ *
+ * <pre>
+ * String className = "com.foo.MyClass";
+ * String classSource =
+ *   "package com.foo;\n" +
+ *   "public class MyClass implements Runnable {\n" +
+ *   "  @Override public void run() {\n" +
+ *   "   log(\"Hello world\");\n" +
+ *   "  }\n" +
+ *   "}";
+ *
+ * // Load class from source.
+ * ClassLoader classLoader = new CompilingClassLoader(
+ *     parentClassLoader, className, classSource);
+ * Class myClass = classLoader.loadClass(className);
+ *
+ * // Use it.
+ * Runnable instance = (Runnable)myClass.newInstance();
+ * instance.run();
+ * </pre>
+ *
+ * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
+ * compile more, create multiple CompilingClassLoader instances.
+ *
+ * Uses Java 1.6's in built compiler API.
+ *
+ * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
+ * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
+ * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
+ * {#setDiagnosticListener()}.
+ *
+ * @see java.lang.ClassLoader
+ * @see javax.tools.JavaCompiler
+ */
+public class CompilingClassLoader extends ClassLoader {
+
+  /**
+   * Thrown when code cannot be compiled.
+   */
+  public static class CompilerException extends Exception {
+    private static final long serialVersionUID = -2936958840023603270L;
+
+    public CompilerException(String message) {
+      super(message);
+    }
+  }
+
+  private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
+
+  private static final URI EMPTY_URI;
+
+  static {
+    try {
+      // Needed to keep SimpleFileObject constructor happy.
+      EMPTY_URI = new URI("");
+    } catch (URISyntaxException e) {
+      throw new Error(e);
+    }
+  }
+
+  /**
+   * @param parent Parent classloader to resolve dependencies from.
+   * @param className Name of class to compile. eg. "com.foo.MyClass".
+   * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
+   * @param diagnosticListener Notified of compiler errors (may be null).
+   */
+  public CompilingClassLoader(
+      ClassLoader parent,
+      String className,
+      String sourceCode,
+      DiagnosticListener<JavaFileObject> diagnosticListener)
+      throws CompilerException {
+    super(parent);
+    if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
+      throw new CompilerException("Could not compile " + className);
+    }
+  }
+
+  /**
+   * Override ClassLoader's class resolving method. Don't call this directly, instead use
+   * {@link ClassLoader#loadClass(String)}.
+   */
+  @Override
+  public Class<?> findClass(String name) throws ClassNotFoundException {
+    ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
+    if (byteCode == null) {
+      throw new ClassNotFoundException(name);
+    }
+    return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
+  }
+
+  /**
+   * @return Whether compilation was successful.
+   */
+  private boolean compileSourceCodeToByteCode(
+      String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
+    JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
+
+    // Set up the in-memory filesystem.
+    InMemoryFileManager fileManager =
+        new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
+    JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
+
+    // Javac option: remove these when the javac zip impl is fixed
+    // (http://b/issue?id=1822932)
+    System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
+    List<String> options = new LinkedList<>();
+    // this is ignored by javac currently but useJavaUtilZip should be
+    // a valid javac XD option, which is another bug
+    options.add("-XDuseJavaUtilZip");
+
+    // Now compile!
+    JavaCompiler.CompilationTask compilationTask =
+        javaCompiler.getTask(
+            null, // Null: log any unhandled errors to stderr.
+            fileManager,
+            diagnosticListener,
+            options,
+            null,
+            singleton(javaFile));
+    return compilationTask.call();
+  }
+
+  /**
+   * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
+   * any files to disk.
+   *
+   * When files are written to, rather than putting the bytes on disk, they are appended to buffers
+   * in byteCodeForClasses.
+   *
+   * @see javax.tools.JavaFileManager
+   */
+  private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
+    public InMemoryFileManager(JavaFileManager fileManager) {
+      super(fileManager);
+    }
+
+    @Override
+    public JavaFileObject getJavaFileForOutput(
+        Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
+        throws IOException {
+      return new SimpleJavaFileObject(EMPTY_URI, kind) {
+        @Override
+        public OutputStream openOutputStream() throws IOException {
+          ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
+          if (outputStream != null) {
+            throw new IllegalStateException("Cannot write more than once");
+          }
+          // Reasonable size for a simple .class.
+          outputStream = new ByteArrayOutputStream(256);
+          byteCodeForClasses.put(className, outputStream);
+          return outputStream;
+        }
+      };
+    }
+  }
+
+  private static class InMemoryJavaFile extends SimpleJavaFileObject {
+    private final String sourceCode;
+
+    public InMemoryJavaFile(String className, String sourceCode) {
+      super(makeUri(className), Kind.SOURCE);
+      this.sourceCode = sourceCode;
+    }
+
+    private static URI makeUri(String className) {
+      try {
+        return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e); // Not sure what could cause this.
+      }
+    }
+
+    @Override
+    public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
+      return sourceCode;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
new file mode 100644
index 0000000..768a97e
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.ValueIterator;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanCompiler {
+  private static final List<Values> INPUTS;
+
+  static {
+    ArrayList<Values> records = new ArrayList<>();
+    for (int i = 0; i < 5; ++i) {
+      records.add(new Values(i));
+    }
+    INPUTS = Collections.unmodifiableList(records);
+  }
+
+  @Test
+  public void testCompile() throws Exception {
+    String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
+    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler();
+    AbstractValuesProcessor proc = compiler.compile(state.tree);
+    Map<String, Iterator<Values>> data = new HashMap<>();
+    data.put("FOO", INPUTS.iterator());
+    proc.initialize(data);
+    ValueIterator v = new ValueIterator(proc);
+    List<Integer> results = new ArrayList<>();
+    while(v.hasNext()) {
+      results.add((Integer) v.next().get(0));
+    }
+    Assert.assertEquals(2, results.size());
+    Assert.assertEquals(4, results.get(0).intValue());
+    Assert.assertEquals(5, results.get(1).intValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
new file mode 100644
index 0000000..62f2d95
--- /dev/null
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-runtime</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+      <sourceDirectory>src/jvm</sourceDirectory>
+      <testSourceDirectory>src/test</testSourceDirectory>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java
new file mode 100644
index 0000000..d32fdbf
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.util.Iterator;
+
+/**
+ * A ValuesIterator wraps an instance of {@link AbstractValuesProcessor} to
+ * provide an iterator of non-null {@link Values}.
+ */
+public class ValueIterator implements Iterator<Values> {
+  private final AbstractValuesProcessor processor;
+  private Values next;
+  public ValueIterator(AbstractValuesProcessor processor) {
+    this.processor = processor;
+  }
+
+  @Override
+  public Values next() {
+    Values oldNext = next;
+    next = null;
+    return oldNext;
+  }
+
+  @Override
+  public boolean hasNext() {
+    while (processor.mayHasNext() && next == null)  {
+      next = processor.next();
+    }
+    return next != null;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("remove");
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..97ce217
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,47 @@
+package org.apache.storm.sql.storm.runtime;
+
+import backtype.storm.tuple.Values;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+  /**
+   * @return the next tuple in the output stream, null if the tuple should be
+   * skipped.
+   */
+  public abstract Values next();
+
+  /**
+   * Initialize the data sources.
+   *
+   * @param data a map from the table name to the iterators of the values.
+   *
+   */
+  public abstract void initialize(Map<String, Iterator<Values>> data);
+
+  protected abstract Iterator<Values>[] getDataSource();
+  /**
+   * Does the output stream potentially has more output.
+   */
+  public boolean mayHasNext() {
+    Iterator<Values>[] d = getDataSource();
+    for (int i = 0; i < d.length; ++i) {
+      if (d[i].hasNext()) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java
new file mode 100644
index 0000000..e74257f
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestValueIterator {
+
+  private static final List<Values> INPUTS;
+
+  static {
+    ArrayList<Values> records = new ArrayList<>();
+    for (int i = 0; i < 5; ++i) {
+      records.add(new Values(i));
+    }
+    INPUTS = Collections.unmodifiableList(records);
+  }
+
+  private static Iterator<Values> mockedInput() {
+    return INPUTS.iterator();
+  }
+
+  private static Iterator<Values>[] getDataSource() {
+    @SuppressWarnings("unchecked")
+    Iterator<Values>[] v = new Iterator[1];
+    v[0] = mockedInput();
+    return v;
+  }
+
+  @Test
+  public void testPassThrough() {
+    final Iterator<Values>[] source = getDataSource();
+    AbstractValuesProcessor processor = new AbstractValuesProcessor() {
+      @Override
+      public Values next() {
+        return source[0].next();
+      }
+
+      @Override
+      public void initialize(
+          Map<String, Iterator<Values>> data) {
+
+      }
+
+      @Override
+      protected Iterator<Values>[] getDataSource() {
+        return source;
+      }
+    };
+    ValueIterator it = new ValueIterator(processor);
+    ArrayList<Values> records = Lists.newArrayList(it);
+    assertEquals(INPUTS.size(), records.size());
+  }
+
+  @Test
+  public void testFilter() {
+    final Iterator<Values>[] source = getDataSource();
+    AbstractValuesProcessor processor = new AbstractValuesProcessor() {
+      @Override
+      public Values next() {
+        Values t = source[0].next();
+        return (int) t.get(0) < 2 ? t : null;
+      }
+
+      @Override
+      public void initialize(
+          Map<String, Iterator<Values>> data) {
+
+      }
+
+      @Override
+      protected Iterator<Values>[] getDataSource() {
+        return source;
+      }
+
+    };
+    ValueIterator it = new ValueIterator(processor);
+    ArrayList<Values> records = Lists.newArrayList(it);
+    assertEquals(2, records.size());
+  }
+}