You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:09:56 UTC
[10/20] storm git commit: [StormSQL] Compile logical plans to Java
code.
[StormSQL] Compile logical plans to Java code.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0a257f12
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0a257f12
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0a257f12
Branch: refs/heads/master
Commit: 0a257f12527a4aec6c477147ff8390ac08c92136
Parents: bd4f6dc
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Oct 22 16:52:59 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:02 2015 -0800
----------------------------------------------------------------------
external/sql/pom.xml | 1 +
external/sql/storm-sql-core/pom.xml | 11 +
.../apache/storm/sql/compiler/CompilerUtil.java | 95 +++++++-
.../apache/storm/sql/compiler/PlanCompiler.java | 131 +++++++++++
.../storm/sql/compiler/RelNodeCompiler.java | 45 ++--
.../storm/sql/javac/CompilingClassLoader.java | 221 +++++++++++++++++++
.../storm/sql/compiler/TestPlanCompiler.java | 53 +++++
.../storm/sql/compiler/TestRelNodeCompiler.java | 10 -
.../apache/storm/sql/compiler/TestUtils.java | 125 ++++-------
external/sql/storm-sql-runtime/pom.xml | 65 ++++++
.../storm/sql/storm/AbstractChannelHandler.java | 35 +++
.../apache/storm/sql/storm/ChannelContext.java | 28 +++
.../apache/storm/sql/storm/ChannelHandler.java | 37 ++++
.../org/apache/storm/sql/storm/Channels.java | 78 +++++++
.../org/apache/storm/sql/storm/DataSource.java | 27 +++
.../storm/runtime/AbstractValuesProcessor.java | 29 +++
16 files changed, 881 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 73e7b31..e4dae94 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -38,5 +38,6 @@
<modules>
<module>storm-sql-core</module>
+ <module>storm-sql-runtime</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
index 3ca1ced..bcace6c 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -43,6 +43,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
@@ -52,6 +58,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 5e7453a..1a48052 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -17,9 +17,22 @@
*/
package org.apache.storm.sql.compiler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
-class CompilerUtil {
+import java.util.ArrayList;
+
+public class CompilerUtil {
static String escapeJavaString(String s, boolean nullMeansNull) {
if(s == null) {
return nullMeansNull ? "null" : "\"\"";
@@ -32,4 +45,84 @@ class CompilerUtil {
return "\"" + s5 + "\"";
}
}
+
+ public static class TableBuilderInfo {
+ private final RelDataTypeFactory typeFactory;
+
+ public TableBuilderInfo(RelDataTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ private static class FieldType {
+ private final String name;
+ private final RelDataType relDataType;
+
+ private FieldType(String name, RelDataType relDataType) {
+ this.name = name;
+ this.relDataType = relDataType;
+ }
+
+ }
+
+ private final ArrayList<FieldType> fields = new ArrayList<>();
+ private final ArrayList<Object[]> rows = new ArrayList<>();
+ private Statistic stats;
+
+ public TableBuilderInfo field(String name, SqlTypeName type) {
+ RelDataType dataType = typeFactory.createSqlType(type);
+ fields.add(new FieldType(name, dataType));
+ return this;
+ }
+
+ public TableBuilderInfo field(String name, SqlTypeName type, int
+ precision) {
+ RelDataType dataType = typeFactory.createSqlType(type, precision);
+ fields.add(new FieldType(name, dataType));
+ return this;
+ }
+
+ public TableBuilderInfo field(
+ String name, SqlDataTypeSpec type) {
+ RelDataType dataType = type.deriveType(typeFactory);
+ fields.add(new FieldType(name, dataType));
+ return this;
+ }
+
+ public TableBuilderInfo statistics(Statistic stats) {
+ this.stats = stats;
+ return this;
+ }
+
+ @VisibleForTesting
+ public TableBuilderInfo rows(Object[] data) {
+ rows.add(data);
+ return this;
+ }
+
+ public Table build() {
+ final Statistic stat = stats;
+ return new Table() {
+ @Override
+ public RelDataType getRowType(
+ RelDataTypeFactory relDataTypeFactory) {
+ RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
+ for (FieldType f : fields) {
+ b.add(f.name, f.relDataType);
+ }
+ return b.build();
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return stat != null ? stat : Statistics.of(rows.size(),
+ ImmutableList.<ImmutableBitSet>of());
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ };
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
new file mode 100644
index 0000000..d006261
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+
+public class PlanCompiler {
+ private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+ private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+ private static final String PROLOGUE = NEW_LINE_JOINER.join(
+ "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+ "import java.util.Iterator;", "import java.util.Map;",
+ "import backtype.storm.tuple.Values;",
+ "import org.apache.storm.sql.storm.AbstractChannelHandler;",
+ "import org.apache.storm.sql.storm.Channels;",
+ "import org.apache.storm.sql.storm.ChannelContext;",
+ "import org.apache.storm.sql.storm.ChannelHandler;",
+ "import org.apache.storm.sql.storm.DataSource;",
+ "import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;",
+ "public final class Processor extends AbstractValuesProcessor {", "");
+ private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+ " @Override",
+ " public void initialize(Map<String, DataSource> data,",
+ " ChannelHandler result) {",
+ " ChannelContext r = Channels.chain(Channels.voidContext(), result);",
+ ""
+ );
+
+ private final JavaTypeFactory typeFactory;
+
+ public PlanCompiler(JavaTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ private String generateJavaSource(RelNode root) throws Exception {
+ StringWriter sw = new StringWriter();
+ try (PrintWriter pw = new PrintWriter(sw)) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ printPrologue(pw);
+ compiler.traverse(root);
+ printMain(pw, root);
+ printEpilogue(pw);
+ }
+ return sw.toString();
+ }
+
+ private void printMain(PrintWriter pw, RelNode root) {
+ Set<TableScan> tables = new HashSet<>();
+ pw.print(INITIALIZER_PROLOGUE);
+ chainOperators(pw, root, tables);
+ for (TableScan n : tables) {
+ String escaped = CompilerUtil.escapeJavaString(
+ Joiner.on('.').join(n.getTable().getQualifiedName()), true);
+ String r = NEW_LINE_JOINER.join(
+ " if (!data.containsKey(%1$s))",
+ " throw new RuntimeException(\"Cannot find table \" + %1$s);",
+ " data.get(%1$s).open(CTX_%2$d);",
+ "");
+ pw.print(String.format(r, escaped, n.getId()));
+ }
+ pw.print(" }\n");
+ }
+
+ private void chainOperators(
+ PrintWriter pw, RelNode root, Set<TableScan> tables) {
+ String lastCtx = "r";
+ Queue<RelNode> q = new ArrayDeque<>();
+ q.add(root);
+ RelNode n;
+ while ((n = q.poll()) != null) {
+ pw.print(
+ String.format(" ChannelContext CTX_%d = Channels.chain(%2$s, " +
+ "%3$s);\n", n.getId(), lastCtx, RelNodeCompiler
+ .getStageName(n)));
+ lastCtx = String.format("CTX_%d", n.getId());
+
+ if (n instanceof TableScan) {
+ tables.add((TableScan)n);
+ }
+
+ for (RelNode i : n.getInputs()) {
+ q.add(i);
+ }
+ }
+ }
+
+ public AbstractValuesProcessor compile(RelNode plan) throws Exception {
+ String javaCode = generateJavaSource(plan);
+ ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+ PACKAGE_NAME + ".Processor",
+ javaCode, null);
+ return (AbstractValuesProcessor) cl.loadClass(
+ PACKAGE_NAME + ".Processor").newInstance();
+ }
+
+ private static void printEpilogue(
+ PrintWriter pw) throws Exception {
+ pw.print("}\n");
+ }
+
+ private static void printPrologue(PrintWriter pw) {
+ pw.append(PROLOGUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
index 0550035..5a21fba2 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
@@ -33,8 +33,17 @@ import java.util.TreeSet;
* Compile RelNodes into individual functions.
*/
class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+ public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
private final PrintWriter pw;
private final JavaTypeFactory typeFactory;
+ private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+ " private static final ChannelHandler %1$s = ",
+ " new AbstractChannelHandler() {",
+ " @Override",
+ " public void dataReceived(ChannelContext ctx, Values _data) {",
+ ""
+ );
public Set<String> getReferredTables() {
return Collections.unmodifiableSet(referredTables);
@@ -49,19 +58,17 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
@Override
Void visitFilter(Filter filter) throws Exception {
- beginFunction(filter);
- pw.print(" if (_data == null) return null;\n");
+ beginStage(filter);
ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
String r = filter.getCondition().accept(compiler);
- pw.print(String.format(" return %s ? _data : null;\n", r));
- endFunction();
+ pw.print(String.format(" if (%s) { ctx.emit(_data); }\n", r));
+ endStage();
return null;
}
@Override
Void visitProject(Project project) throws Exception {
- beginFunction(project);
- pw.print(" if (_data == null) return null;\n");
+ beginStage(project);
ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
int size = project.getChildExps().size();
@@ -70,9 +77,9 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
res[i] = project.getChildExps().get(i).accept(compiler);
}
- pw.print(String.format(" return new Values(%s);\n", Joiner.on(',').join
- (res)));
- endFunction();
+ pw.print(String.format(" ctx.emit(new Values(%s));\n",
+ Joiner.on(',').join(res)));
+ endStage();
return null;
}
@@ -85,23 +92,21 @@ class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
Void visitTableScan(TableScan scan) throws Exception {
String tableName = Joiner.on('_').join(scan.getTable().getQualifiedName());
referredTables.add(tableName);
- beginFunction(scan);
- pw.print(String.format(" return _datasources[TABLE_%s].next();\n",
- tableName));
- endFunction();
+ beginStage(scan);
+ pw.print(" ctx.emit(_data);\n");
+ endStage();
return null;
}
- private void beginFunction(RelNode n) {
- pw.print(String.format("private Values %s(%s) {\n", getFunctionName(n), n
- .getInputs().isEmpty() ? "" : "Values _data"));
+ private void beginStage(RelNode n) {
+ pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
}
- private void endFunction() {
- pw.print("}\n");
+ private void endStage() {
+ pw.print(" }\n };\n");
}
- static String getFunctionName(RelNode n) {
- return n.getClass().getSimpleName() + "_" + n.getId();
+ static String getStageName(RelNode n) {
+ return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
new file mode 100644
index 0000000..cf76964
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2010 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.javac;
+
+
+import javax.tools.DiagnosticListener;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.singleton;
+
+/**
+ * This is a Java ClassLoader that will attempt to load a class from a string of source code.
+ *
+ * <h3>Example</h3>
+ *
+ * <pre>
+ * String className = "com.foo.MyClass";
+ * String classSource =
+ * "package com.foo;\n" +
+ * "public class MyClass implements Runnable {\n" +
+ * " @Override public void run() {\n" +
+ * " log(\"Hello world\");\n" +
+ * " }\n" +
+ * "}";
+ *
+ * // Load class from source.
+ * ClassLoader classLoader = new CompilingClassLoader(
+ * parentClassLoader, className, classSource);
+ * Class myClass = classLoader.loadClass(className);
+ *
+ * // Use it.
+ * Runnable instance = (Runnable)myClass.newInstance();
+ * instance.run();
+ * </pre>
+ *
+ * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
+ * compile more, create multiple CompilingClassLoader instances.
+ *
+ * Uses Java 1.6's in built compiler API.
+ *
+ * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
+ * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
+ * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
+ * {#setDiagnosticListener()}.
+ *
+ * @see java.lang.ClassLoader
+ * @see javax.tools.JavaCompiler
+ */
+public class CompilingClassLoader extends ClassLoader {
+
+ /**
+ * Thrown when code cannot be compiled.
+ */
+ public static class CompilerException extends Exception {
+ private static final long serialVersionUID = -2936958840023603270L;
+
+ public CompilerException(String message) {
+ super(message);
+ }
+ }
+
+ private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
+
+ private static final URI EMPTY_URI;
+
+ static {
+ try {
+ // Needed to keep SimpleFileObject constructor happy.
+ EMPTY_URI = new URI("");
+ } catch (URISyntaxException e) {
+ throw new Error(e);
+ }
+ }
+
+ /**
+ * @param parent Parent classloader to resolve dependencies from.
+ * @param className Name of class to compile. eg. "com.foo.MyClass".
+ * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
+ * @param diagnosticListener Notified of compiler errors (may be null).
+ */
+ public CompilingClassLoader(
+ ClassLoader parent,
+ String className,
+ String sourceCode,
+ DiagnosticListener<JavaFileObject> diagnosticListener)
+ throws CompilerException {
+ super(parent);
+ if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
+ throw new CompilerException("Could not compile " + className);
+ }
+ }
+
+ /**
+ * Override ClassLoader's class resolving method. Don't call this directly, instead use
+ * {@link ClassLoader#loadClass(String)}.
+ */
+ @Override
+ public Class<?> findClass(String name) throws ClassNotFoundException {
+ ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
+ if (byteCode == null) {
+ throw new ClassNotFoundException(name);
+ }
+ return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
+ }
+
+ /**
+ * @return Whether compilation was successful.
+ */
+ private boolean compileSourceCodeToByteCode(
+ String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
+ JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
+
+ // Set up the in-memory filesystem.
+ InMemoryFileManager fileManager =
+ new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
+ JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
+
+ // Javac option: remove these when the javac zip impl is fixed
+ // (http://b/issue?id=1822932)
+ System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
+ List<String> options = new LinkedList<>();
+ // this is ignored by javac currently but useJavaUtilZip should be
+ // a valid javac XD option, which is another bug
+ options.add("-XDuseJavaUtilZip");
+
+ // Now compile!
+ JavaCompiler.CompilationTask compilationTask =
+ javaCompiler.getTask(
+ null, // Null: log any unhandled errors to stderr.
+ fileManager,
+ diagnosticListener,
+ options,
+ null,
+ singleton(javaFile));
+ return compilationTask.call();
+ }
+
+ /**
+ * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
+ * any files to disk.
+ *
+ * When files are written to, rather than putting the bytes on disk, they are appended to buffers
+ * in byteCodeForClasses.
+ *
+ * @see javax.tools.JavaFileManager
+ */
+ private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
+ public InMemoryFileManager(JavaFileManager fileManager) {
+ super(fileManager);
+ }
+
+ @Override
+ public JavaFileObject getJavaFileForOutput(
+ Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
+ throws IOException {
+ return new SimpleJavaFileObject(EMPTY_URI, kind) {
+ @Override
+ public OutputStream openOutputStream() throws IOException {
+ ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
+ if (outputStream != null) {
+ throw new IllegalStateException("Cannot write more than once");
+ }
+ // Reasonable size for a simple .class.
+ outputStream = new ByteArrayOutputStream(256);
+ byteCodeForClasses.put(className, outputStream);
+ return outputStream;
+ }
+ };
+ }
+ }
+
+ private static class InMemoryJavaFile extends SimpleJavaFileObject {
+ private final String sourceCode;
+
+ public InMemoryJavaFile(String className, String sourceCode) {
+ super(makeUri(className), Kind.SOURCE);
+ this.sourceCode = sourceCode;
+ }
+
+ private static URI makeUri(String className) {
+ try {
+ return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e); // Not sure what could cause this.
+ }
+ }
+
+ @Override
+ public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
+ return sourceCode;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
new file mode 100644
index 0000000..8d22a53
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import backtype.storm.tuple.Values;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanCompiler {
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ @Test
+ public void testCompile() throws Exception {
+ String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
+ TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ AbstractValuesProcessor proc = compiler.compile(state.tree);
+ Map<String, DataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockDataSource());
+ List<Values> values = new ArrayList<>();
+ ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+ proc.initialize(data, h);
+ Assert.assertArrayEquals(new Values[] { new Values(4), new Values(5)},
+ values.toArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
index 61f5409..cedb48b 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -38,16 +38,6 @@ public class TestRelNodeCompiler {
RelDataTypeSystem.DEFAULT);
LogicalProject project = (LogicalProject) state.tree;
LogicalFilter filter = (LogicalFilter) project.getInput();
- TableScan scan = (TableScan) filter.getInput();
-
- try (StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw)
- ) {
- RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
- compiler.visitTableScan(scan);
- pw.flush();
- Assert.assertTrue(sw.toString().contains("_datasources[TABLE_FOO]"));
- }
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
index ae4300a..6731c90 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestUtils.java
@@ -1,13 +1,11 @@
package org.apache.storm.sql.compiler;
-import com.google.common.collect.ImmutableList;
+import backtype.storm.tuple.Values;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.Schema;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
@@ -17,15 +15,21 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.storm.sql.storm.ChannelContext;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
import java.util.ArrayList;
+import java.util.List;
-class TestUtils {
+public class TestUtils {
static CalciteState sqlOverDummyTable(String sql)
throws RelConversionException, ValidationException, SqlParseException {
SchemaPlus schema = Frameworks.createRootSchema(true);
- Table table = newTable().field("ID", SqlTypeName.INTEGER).build();
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+ (RelDataTypeSystem.DEFAULT);
+ Table table = new CompilerUtil.TableBuilderInfo(typeFactory)
+ .field("ID", SqlTypeName.INTEGER).build();
schema.add("FOO", table);
FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
schema).build();
@@ -36,90 +40,53 @@ class TestUtils {
return new CalciteState(schema, tree);
}
- static class TableBuilderInfo {
- private static class FieldType {
- private static final int NO_PRECISION = -1;
- private final String name;
- private final SqlTypeName type;
- private final int precision;
-
- private FieldType(String name, SqlTypeName type, int precision) {
- this.name = name;
- this.type = type;
- this.precision = precision;
- }
+ static class CalciteState {
+ final SchemaPlus schema;
+ final RelNode tree;
- private FieldType(String name, SqlTypeName type) {
- this(name, type, NO_PRECISION);
- }
+ private CalciteState(SchemaPlus schema, RelNode tree) {
+ this.schema = schema;
+ this.tree = tree;
}
+ }
- private final ArrayList<FieldType> fields = new ArrayList<>();
- private final ArrayList<Object[]> rows = new ArrayList<>();
- private Statistic stats;
+ public static class MockDataSource implements DataSource {
+ private final ArrayList<Values> RECORDS = new ArrayList<>();
- TableBuilderInfo field(String name, SqlTypeName type) {
- fields.add(new FieldType(name, type));
- return this;
+ public MockDataSource() {
+ for (int i = 0; i < 5; ++i) {
+ RECORDS.add(new Values(i));
+ }
}
- TableBuilderInfo field(String name, SqlTypeName type, int precision) {
- fields.add(new FieldType(name, type, precision));
- return this;
+ @Override
+ public void open(ChannelContext ctx) {
+ for (Values v : RECORDS) {
+ ctx.emit(v);
+ }
+ ctx.fireChannelInactive();
}
+ }
- TableBuilderInfo statistics(Statistic stats) {
- this.stats = stats;
- return this;
- }
+ public static class CollectDataChannelHandler implements ChannelHandler {
+ private final List<Values> values;
- TableBuilderInfo rows(Object[] data) {
- rows.add(data);
- return this;
+ public CollectDataChannelHandler(List<Values> values) {
+ this.values = values;
}
- Table build() {
- final Statistic stat = stats;
- return new Table() {
- @Override
- public RelDataType getRowType(
- RelDataTypeFactory relDataTypeFactory) {
- RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
- for (FieldType f : fields) {
- if (f.precision == FieldType.NO_PRECISION) {
- b.add(f.name, f.type);
- } else {
- b.add(f.name, f.type, f.precision);
- }
- }
- return b.build();
- }
-
- @Override
- public Statistic getStatistic() {
- return stat != null ? stat : Statistics.of(rows.size(),
- ImmutableList.<ImmutableBitSet>of());
- }
-
- @Override
- public Schema.TableType getJdbcTableType() {
- return Schema.TableType.TABLE;
- }
- };
+ @Override
+ public void dataReceived(ChannelContext ctx, Values data) {
+ values.add(data);
}
- }
- static TableBuilderInfo newTable() {
- return new TableBuilderInfo();
- }
+ @Override
+ public void channelInactive(ChannelContext ctx) {}
- static class CalciteState {
- final SchemaPlus schema;
- final RelNode tree;
-
- private CalciteState(SchemaPlus schema, RelNode tree) {
- this.schema = schema;
- this.tree = tree;
+ @Override
+ public void exceptionCaught(Throwable cause) {
+ throw new RuntimeException(cause);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
new file mode 100644
index 0000000..62f2d95
--- /dev/null
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-runtime</artifactId>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
new file mode 100644
index 0000000..cf110e3
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/AbstractChannelHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+public abstract class AbstractChannelHandler implements ChannelHandler {
+ @Override
+ public abstract void dataReceived(ChannelContext ctx, Values data);
+
+ @Override
+ public void channelInactive(ChannelContext ctx) {
+
+ }
+
+ @Override
+ public void exceptionCaught(Throwable cause) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
new file mode 100644
index 0000000..a2806b2
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelContext.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+public interface ChannelContext {
+ /**
+ * Emit data to the next stage of the data pipeline.
+ */
+ void emit(Values data);
+ void fireChannelInactive();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
new file mode 100644
index 0000000..8cd3a28
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ChannelHandler.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+/**
+ * DataListener provides an event-driven interface for the user to process
+ * series of events.
+ */
+public interface ChannelHandler {
+ void dataReceived(ChannelContext ctx, Values data);
+
+ /**
+ * The producer of the data has indicated that the channel is no longer
+ * active.
+ * @param ctx
+ */
+ void channelInactive(ChannelContext ctx);
+
+ void exceptionCaught(Throwable cause);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
new file mode 100644
index 0000000..b5bb619
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/Channels.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+
+public class Channels {
+ private static final ChannelContext VOID_CTX = new ChannelContext() {
+ @Override
+ public void emit(Values data) {}
+
+ @Override
+ public void fireChannelInactive() {}
+ };
+
+ private static class ChannelContextAdapter implements ChannelContext {
+ private final ChannelHandler handler;
+ private final ChannelContext next;
+
+ public ChannelContextAdapter(
+ ChannelContext next, ChannelHandler handler) {
+ this.handler = handler;
+ this.next = next;
+ }
+
+ @Override
+ public void emit(Values data) {
+ handler.dataReceived(next, data);
+ }
+
+ @Override
+ public void fireChannelInactive() {
+ handler.channelInactive(next);
+ }
+ }
+
+ private static class ForwardingChannelContext implements ChannelContext {
+ private final ChannelContext next;
+
+ public ForwardingChannelContext(ChannelContext next) {
+ this.next = next;
+ }
+
+ @Override
+ public void emit(Values data) {
+ next.emit(data);
+ }
+
+ @Override
+ public void fireChannelInactive() {
+ next.fireChannelInactive();
+ }
+ }
+
+ public static ChannelContext chain(
+ ChannelContext next, ChannelHandler handler) {
+ return new ChannelContextAdapter(next, handler);
+ }
+
+ public static ChannelContext voidContext() {
+ return VOID_CTX;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
new file mode 100644
index 0000000..84fa6e0
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/DataSource.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+/**
+ * A DataSource ingests data in StormSQL. It provides a series of tuple to
+ * the downstream {@link ChannelHandler}.
+ *
+ */
+public interface DataSource {
+ void open(ChannelContext ctx);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0a257f12/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..bd068be
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,29 @@
+package org.apache.storm.sql.storm.runtime;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+ /**
+ * Initialize the data sources.
+ *
+ * @param data a map from the table name to the iterators of the values.
+ *
+ */
+ public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+ result);
+}