You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/27 18:08:13 UTC
[9/9] storm git commit: Compile logical plans to Java code.
Compile logical plans to Java code.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2955a789
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2955a789
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2955a789
Branch: refs/heads/STORM-1040
Commit: 2955a78937cfafd7db40276a4893165db9c38f3d
Parents: 54723fa
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Oct 22 16:52:59 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Oct 26 10:04:16 2015 -0700
----------------------------------------------------------------------
external/sql/pom.xml | 1 +
external/sql/storm-sql-core/pom.xml | 6 +
.../apache/storm/sql/compiler/PlanCompiler.java | 136 ++++++++++++
.../storm/sql/javac/CompilingClassLoader.java | 221 +++++++++++++++++++
.../storm/sql/compiler/TestPlanCompiler.java | 62 ++++++
external/sql/storm-sql-runtime/pom.xml | 65 ++++++
.../apache/storm/sql/storm/ValueIterator.java | 55 +++++
.../storm/runtime/AbstractValuesProcessor.java | 47 ++++
.../storm/sql/storm/TestValueIterator.java | 107 +++++++++
9 files changed, 700 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
index 73e7b31..e4dae94 100644
--- a/external/sql/pom.xml
+++ b/external/sql/pom.xml
@@ -38,5 +38,6 @@
<modules>
<module>storm-sql-core</module>
+ <module>storm-sql-runtime</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
index 562f12f..0977c45 100644
--- a/external/sql/storm-sql-core/pom.xml
+++ b/external/sql/storm-sql-core/pom.xml
@@ -43,6 +43,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-sql-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
new file mode 100644
index 0000000..5d910e7
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PlanCompiler {
+ private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+ private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+ private static final String PROLOGUE = NEW_LINE_JOINER.join(
+ "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+ "import java.util.Iterator;", "import java.util.Map;",
+ "import backtype.storm.tuple.Values;",
+ "import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;", "",
+ "public final class Processor extends AbstractValuesProcessor {", "",
+ "@Override",
+ "protected Iterator<Values>[] getDataSource() { return _datasources; }","");
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ private String generateJavaSource(RelNode root) throws Exception {
+ StringWriter sw = new StringWriter();
+ try (PrintWriter pw = new PrintWriter(sw)) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ printPrologue(pw);
+ compiler.traverse(root);
+ printTableReference(pw, compiler.getReferredTables());
+ printEpilogue(pw, root);
+ }
+ return sw.toString();
+ }
+
+ private void printTableReference(PrintWriter pw, Set<String> referredTables) {
+ Map<String, Integer> ids = new HashMap<>();
+ for (String s : referredTables) {
+ pw.print(String.format(" private static final int TABLE_%s = %d;\n", s,
+ ids.size()));
+ ids.put(s, ids.size());
+ }
+
+ pw.print(" @SuppressWarnings(\"unchecked\")\n");
+ pw.print(String.format(
+ " private final Iterator<Values>[] _datasources = new Iterator[%d];\n",
+ referredTables.size()));
+ pw.print(
+ " @Override public void initialize(Map<String, Iterator<Values>> " +
+ "data) {\n");
+ for (String s : referredTables) {
+ String escaped = CompilerUtil.escapeJavaString(s, true);
+ String r = NEW_LINE_JOINER.join(" if (!data.containsKey(%1$s))",
+ " throw new RuntimeException(\"Cannot find table \" + %1$s);",
+ " _datasources[%2$d] = data.get(%1$s);");
+ pw.print(String.format(r, escaped, ids.get(s)));
+ }
+ pw.print("}\n");
+ }
+
+ public AbstractValuesProcessor compile(RelNode plan) throws Exception {
+ String javaCode = generateJavaSource(plan);
+ ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+ PACKAGE_NAME + ".Processor",
+ javaCode, null);
+ AbstractValuesProcessor processor = (AbstractValuesProcessor) cl.loadClass(
+ PACKAGE_NAME + ".Processor").newInstance();
+ return processor;
+ }
+
+ private static void printEpilogue(
+ PrintWriter pw, RelNode root) throws Exception {
+ pw.print(" @Override public Values next() {\n");
+ MainBodyCompiler compiler = new MainBodyCompiler(pw);
+ compiler.traverse(root);
+ pw.print(String.format(" return t%d;\n", root.getId()));
+ pw.print(" }\n");
+ pw.print("}\n");
+ }
+
+ private static void printPrologue(PrintWriter pw) {
+ pw.append(PROLOGUE);
+ }
+
+ private static class MainBodyCompiler extends PostOrderRelNodeVisitor<Void> {
+ private final PrintWriter pw;
+
+ private MainBodyCompiler(PrintWriter pw) {
+ this.pw = pw;
+ }
+
+ @Override
+ Void defaultValue(RelNode n) {
+ String params;
+ if (n instanceof TableScan) {
+ params = "";
+ } else {
+ ArrayList<String> inputIds = new ArrayList<>();
+ for (RelNode i : n.getInputs()) {
+ inputIds.add("t" + i.getId());
+ }
+ params = Joiner.on(",").join(inputIds);
+ }
+ String l = String.format(" Values t%d = %s(%s);\n", n.getId(),
+ RelNodeCompiler.getFunctionName(n), params);
+ pw.print(l);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
new file mode 100644
index 0000000..cf76964
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2010 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.javac;
+
+
+import javax.tools.DiagnosticListener;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.singleton;
+
+/**
+ * This is a Java ClassLoader that will attempt to load a class from a string of source code.
+ *
+ * <h3>Example</h3>
+ *
+ * <pre>
+ * String className = "com.foo.MyClass";
+ * String classSource =
+ * "package com.foo;\n" +
+ * "public class MyClass implements Runnable {\n" +
+ * " @Override public void run() {\n" +
+ * " log(\"Hello world\");\n" +
+ * " }\n" +
+ * "}";
+ *
+ * // Load class from source.
+ * ClassLoader classLoader = new CompilingClassLoader(
+ * parentClassLoader, className, classSource);
+ * Class myClass = classLoader.loadClass(className);
+ *
+ * // Use it.
+ * Runnable instance = (Runnable)myClass.newInstance();
+ * instance.run();
+ * </pre>
+ *
+ * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
+ * compile more, create multiple CompilingClassLoader instances.
+ *
+ * Uses Java 1.6's in built compiler API.
+ *
+ * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
+ * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
+ * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
+ * {#setDiagnosticListener()}.
+ *
+ * @see java.lang.ClassLoader
+ * @see javax.tools.JavaCompiler
+ */
+public class CompilingClassLoader extends ClassLoader {
+
+ /**
+ * Thrown when code cannot be compiled.
+ */
+ public static class CompilerException extends Exception {
+ private static final long serialVersionUID = -2936958840023603270L;
+
+ public CompilerException(String message) {
+ super(message);
+ }
+ }
+
+ private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
+
+ private static final URI EMPTY_URI;
+
+ static {
+ try {
+ // Needed to keep SimpleFileObject constructor happy.
+ EMPTY_URI = new URI("");
+ } catch (URISyntaxException e) {
+ throw new Error(e);
+ }
+ }
+
+ /**
+ * @param parent Parent classloader to resolve dependencies from.
+ * @param className Name of class to compile. eg. "com.foo.MyClass".
+ * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
+ * @param diagnosticListener Notified of compiler errors (may be null).
+ */
+ public CompilingClassLoader(
+ ClassLoader parent,
+ String className,
+ String sourceCode,
+ DiagnosticListener<JavaFileObject> diagnosticListener)
+ throws CompilerException {
+ super(parent);
+ if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
+ throw new CompilerException("Could not compile " + className);
+ }
+ }
+
+ /**
+ * Override ClassLoader's class resolving method. Don't call this directly, instead use
+ * {@link ClassLoader#loadClass(String)}.
+ */
+ @Override
+ public Class<?> findClass(String name) throws ClassNotFoundException {
+ ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
+ if (byteCode == null) {
+ throw new ClassNotFoundException(name);
+ }
+ return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
+ }
+
+ /**
+ * @return Whether compilation was successful.
+ */
+ private boolean compileSourceCodeToByteCode(
+ String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
+ JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
+
+ // Set up the in-memory filesystem.
+ InMemoryFileManager fileManager =
+ new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
+ JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
+
+ // Javac option: remove these when the javac zip impl is fixed
+ // (http://b/issue?id=1822932)
+ System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
+ List<String> options = new LinkedList<>();
+ // this is ignored by javac currently but useJavaUtilZip should be
+ // a valid javac XD option, which is another bug
+ options.add("-XDuseJavaUtilZip");
+
+ // Now compile!
+ JavaCompiler.CompilationTask compilationTask =
+ javaCompiler.getTask(
+ null, // Null: log any unhandled errors to stderr.
+ fileManager,
+ diagnosticListener,
+ options,
+ null,
+ singleton(javaFile));
+ return compilationTask.call();
+ }
+
+ /**
+ * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
+ * any files to disk.
+ *
+ * When files are written to, rather than putting the bytes on disk, they are appended to buffers
+ * in byteCodeForClasses.
+ *
+ * @see javax.tools.JavaFileManager
+ */
+ private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
+ public InMemoryFileManager(JavaFileManager fileManager) {
+ super(fileManager);
+ }
+
+ @Override
+ public JavaFileObject getJavaFileForOutput(
+ Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
+ throws IOException {
+ return new SimpleJavaFileObject(EMPTY_URI, kind) {
+ @Override
+ public OutputStream openOutputStream() throws IOException {
+ ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
+ if (outputStream != null) {
+ throw new IllegalStateException("Cannot write more than once");
+ }
+ // Reasonable size for a simple .class.
+ outputStream = new ByteArrayOutputStream(256);
+ byteCodeForClasses.put(className, outputStream);
+ return outputStream;
+ }
+ };
+ }
+ }
+
+ private static class InMemoryJavaFile extends SimpleJavaFileObject {
+ private final String sourceCode;
+
+ public InMemoryJavaFile(String className, String sourceCode) {
+ super(makeUri(className), Kind.SOURCE);
+ this.sourceCode = sourceCode;
+ }
+
+ private static URI makeUri(String className) {
+ try {
+ return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e); // Not sure what could cause this.
+ }
+ }
+
+ @Override
+ public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
+ return sourceCode;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
new file mode 100644
index 0000000..768a97e
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.ValueIterator;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanCompiler {
+ private static final List<Values> INPUTS;
+
+ static {
+ ArrayList<Values> records = new ArrayList<>();
+ for (int i = 0; i < 5; ++i) {
+ records.add(new Values(i));
+ }
+ INPUTS = Collections.unmodifiableList(records);
+ }
+
+ @Test
+ public void testCompile() throws Exception {
+ String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
+ TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+ PlanCompiler compiler = new PlanCompiler();
+ AbstractValuesProcessor proc = compiler.compile(state.tree);
+ Map<String, Iterator<Values>> data = new HashMap<>();
+ data.put("FOO", INPUTS.iterator());
+ proc.initialize(data);
+ ValueIterator v = new ValueIterator(proc);
+ List<Integer> results = new ArrayList<>();
+ while(v.hasNext()) {
+ results.add((Integer) v.next().get(0));
+ }
+ Assert.assertEquals(2, results.size());
+ Assert.assertEquals(4, results.get(0).intValue());
+ Assert.assertEquals(5, results.get(1).intValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
new file mode 100644
index 0000000..62f2d95
--- /dev/null
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-sql-runtime</artifactId>
+
+ <developers>
+ <developer>
+ <id>haohui</id>
+ <name>Haohui Mai</name>
+ <email>ricetons@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>src/test</testSourceDirectory>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java
new file mode 100644
index 0000000..d32fdbf
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/ValueIterator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+
+import java.util.Iterator;
+
+/**
+ * A ValuesIterator wraps an instance of {@link AbstractValuesProcessor} to
+ * provide an iterator of non-null {@link Values}.
+ */
+public class ValueIterator implements Iterator<Values> {
+ private final AbstractValuesProcessor processor;
+ private Values next;
+ public ValueIterator(AbstractValuesProcessor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public Values next() {
+ Values oldNext = next;
+ next = null;
+ return oldNext;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (processor.mayHasNext() && next == null) {
+ next = processor.next();
+ }
+ return next != null;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..97ce217
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/storm/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,47 @@
+package org.apache.storm.sql.storm.runtime;
+
+import backtype.storm.tuple.Values;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+ /**
+ * @return the next tuple in the output stream, null if the tuple should be
+ * skipped.
+ */
+ public abstract Values next();
+
+ /**
+ * Initialize the data sources.
+ *
+ * @param data a map from the table name to the iterators of the values.
+ *
+ */
+ public abstract void initialize(Map<String, Iterator<Values>> data);
+
+ protected abstract Iterator<Values>[] getDataSource();
+ /**
+ * Does the output stream potentially has more output.
+ */
+ public boolean mayHasNext() {
+ Iterator<Values>[] d = getDataSource();
+ for (int i = 0; i < d.length; ++i) {
+ if (d[i].hasNext()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2955a789/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java
new file mode 100644
index 0000000..e74257f
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/storm/TestValueIterator.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.storm;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestValueIterator {
+
+ private static final List<Values> INPUTS;
+
+ static {
+ ArrayList<Values> records = new ArrayList<>();
+ for (int i = 0; i < 5; ++i) {
+ records.add(new Values(i));
+ }
+ INPUTS = Collections.unmodifiableList(records);
+ }
+
+ private static Iterator<Values> mockedInput() {
+ return INPUTS.iterator();
+ }
+
+ private static Iterator<Values>[] getDataSource() {
+ @SuppressWarnings("unchecked")
+ Iterator<Values>[] v = new Iterator[1];
+ v[0] = mockedInput();
+ return v;
+ }
+
+ @Test
+ public void testPassThrough() {
+ final Iterator<Values>[] source = getDataSource();
+ AbstractValuesProcessor processor = new AbstractValuesProcessor() {
+ @Override
+ public Values next() {
+ return source[0].next();
+ }
+
+ @Override
+ public void initialize(
+ Map<String, Iterator<Values>> data) {
+
+ }
+
+ @Override
+ protected Iterator<Values>[] getDataSource() {
+ return source;
+ }
+ };
+ ValueIterator it = new ValueIterator(processor);
+ ArrayList<Values> records = Lists.newArrayList(it);
+ assertEquals(INPUTS.size(), records.size());
+ }
+
+ @Test
+ public void testFilter() {
+ final Iterator<Values>[] source = getDataSource();
+ AbstractValuesProcessor processor = new AbstractValuesProcessor() {
+ @Override
+ public Values next() {
+ Values t = source[0].next();
+ return (int) t.get(0) < 2 ? t : null;
+ }
+
+ @Override
+ public void initialize(
+ Map<String, Iterator<Values>> data) {
+
+ }
+
+ @Override
+ protected Iterator<Values>[] getDataSource() {
+ return source;
+ }
+
+ };
+ ValueIterator it = new ValueIterator(processor);
+ ArrayList<Values> records = Lists.newArrayList(it);
+ assertEquals(2, records.size());
+ }
+}