You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:10:00 UTC

[14/20] storm git commit: [StormSQL] STORM-1181. Compile SQLs into Tridient topology and execute them in LocalCluster.

[StormSQL] STORM-1181. Compile SQLs into Tridient topology and execute them in LocalCluster.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e378c65
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e378c65
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e378c65

Branch: refs/heads/master
Commit: 7e378c65bc18ebcca3e500ea70f6da3f376003c3
Parents: 82f10eb
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 16 14:48:43 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:28 2015 -0800

----------------------------------------------------------------------
 .../compiler/backends/trident/PlanCompiler.java | 194 +++++++++++++++++++
 .../backends/trident/RelNodeCompiler.java       | 116 +++++++++++
 .../backends/trident/TestPlanCompiler.java      | 116 +++++++++++
 external/sql/storm-sql-runtime/pom.xml          |  13 ++
 .../sql/runtime/ISqlTridentDataSource.java      |   3 +-
 .../trident/AbstractTridentProcessor.java       |  43 ++++
 .../test/org/apache/storm/sql/TestUtils.java    |  88 +++++++++
 7 files changed, 572 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
new file mode 100644
index 0000000..f8bfd12
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
@@ -0,0 +1,194 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class PlanCompiler {
+  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+  private static final String PROLOGUE = NEW_LINE_JOINER.join(
+      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+      "import java.util.List;",
+      "import java.util.Map;",
+      "import backtype.storm.tuple.Fields;",
+      "import backtype.storm.tuple.Values;",
+      "import org.apache.storm.sql.runtime.ISqlTridentDataSource;",
+      "import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;",
+      "import storm.trident.Stream;",
+      "import storm.trident.TridentTopology;",
+      "import storm.trident.fluent.IAggregatableStream;",
+      "import storm.trident.operation.TridentCollector;",
+      "import storm.trident.operation.BaseFunction;",
+      "import storm.trident.spout.IBatchSpout;",
+      "import storm.trident.tuple.TridentTuple;",
+      "",
+      "public final class TridentProcessor extends AbstractTridentProcessor {",
+      "");
+  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+      "  @Override",
+      "  public TridentTopology build(Map<String, ISqlTridentDataSource> _sources) {",
+      "    TridentTopology topo = new TridentTopology();",
+      ""
+  );
+
+  private final JavaTypeFactory typeFactory;
+  public PlanCompiler(JavaTypeFactory typeFactory) {
+    this.typeFactory = typeFactory;
+  }
+
+  private String generateJavaSource(RelNode root) throws Exception {
+    StringWriter sw = new StringWriter();
+    try (PrintWriter pw = new PrintWriter(sw)) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      printPrologue(pw);
+      compiler.traverse(root);
+      printMain(pw, root);
+      printEpilogue(pw);
+    }
+    return sw.toString();
+  }
+
+  private static class MainFuncCompiler extends PostOrderRelNodeVisitor<Void> {
+    private final PrintWriter pw;
+    private static final String TABLESCAN_TMPL = NEW_LINE_JOINER.join(
+        "if (!_sources.containsKey(%2$s))",
+        "    throw new RuntimeException(\"Cannot find table \" + %2$s);",
+        "Stream _%1$s = topo.newStream(%2$s, _sources.get(%2$s).getProducer());",
+        ""
+    );
+
+    private static final String TABLEMODIFY_TMPL = NEW_LINE_JOINER.join(
+        "Stream _%1$s = _%3$s.each(new Fields(%4$s), _sources.get(%2$s).getConsumer(), new Fields(%5$s));",
+        ""
+    );
+    private static final String TRANSFORMATION_TMPL = NEW_LINE_JOINER.join(
+        "Stream _%1$s = _%2$s.each(new Fields(%3$s), %1$s, new Fields(%4$s)).toStream().project(new Fields(%4$s));",
+        ""
+    );
+
+    private MainFuncCompiler(PrintWriter pw) {
+      this.pw = pw;
+    }
+
+    @Override
+    public Void defaultValue(RelNode n) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Void visitFilter(Filter filter) throws Exception {
+      visitTransformation(filter);
+      return null;
+    }
+
+    @Override
+    public Void visitTableModify(TableModify modify) throws Exception {
+      Preconditions.checkArgument(modify.isInsert(), "Only INSERT statement is supported.");
+      String name = RelNodeCompiler.getStageName(modify);
+      RelNode input = modify.getInput();
+      String inputName = RelNodeCompiler.getStageName(input);
+      pw.print(String.format(TABLEMODIFY_TMPL, name, CompilerUtil.escapeJavaString(
+          Joiner.on('.').join(modify.getTable().getQualifiedName()), true),
+          inputName, getFieldString(input), getFieldString(modify)));
+      return null;
+    }
+
+    @Override
+    public Void visitTableScan(TableScan scan) throws Exception {
+      String name = RelNodeCompiler.getStageName(scan);
+      pw.print(String.format(TABLESCAN_TMPL, name, CompilerUtil.escapeJavaString(
+          Joiner.on('.').join(scan.getTable().getQualifiedName()), true)));
+      return null;
+    }
+
+    @Override
+    public Void visitProject(Project project) throws Exception {
+      visitTransformation(project);
+      return null;
+    }
+
+    private static String getFieldString(RelNode n) {
+      int id = n.getId();
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (String f: n.getRowType().getFieldNames()) {
+        if (!first) {
+          sb.append(", ");
+        }
+        if (n instanceof TableScan) {
+          sb.append(CompilerUtil.escapeJavaString(f, true));
+        } else {
+          sb.append(CompilerUtil.escapeJavaString(String.format("%d_%s", id, f), true));
+        }
+        first = false;
+      }
+      return sb.toString();
+    }
+
+    private void visitTransformation(SingleRel node) {
+      String name = RelNodeCompiler.getStageName(node);
+      RelNode input = node.getInput();
+      String inputName = RelNodeCompiler.getStageName(input);
+      pw.print(String.format(TRANSFORMATION_TMPL, name, inputName,
+          getFieldString(input), getFieldString(node)));
+    }
+  }
+
+  private void printMain(PrintWriter pw, RelNode root) throws Exception {
+    pw.print(INITIALIZER_PROLOGUE);
+    MainFuncCompiler compiler = new MainFuncCompiler(pw);
+    compiler.traverse(root);
+    pw.print(String.format("  this.outputStream = _%s;\n", RelNodeCompiler.getStageName(root)));
+    pw.print("  return topo; \n}\n");
+  }
+
+  public AbstractTridentProcessor compile(RelNode plan) throws Exception {
+    String javaCode = generateJavaSource(plan);
+    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+                                              PACKAGE_NAME + ".TridentProcessor",
+                                              javaCode, null);
+    return (AbstractTridentProcessor) cl.loadClass(PACKAGE_NAME + ".TridentProcessor").newInstance();
+  }
+
+  private static void printEpilogue(
+      PrintWriter pw) throws Exception {
+    pw.print("}\n");
+  }
+
+  private static void printPrologue(PrintWriter pw) {
+    pw.append(PROLOGUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
new file mode 100644
index 0000000..1de39d3
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
@@ -0,0 +1,116 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+
+import java.io.PrintWriter;
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
+  private final PrintWriter pw;
+  private final JavaTypeFactory typeFactory;
+  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+    "  private static final BaseFunction %1$s = ",
+    "    new BaseFunction() {",
+    "    @Override",
+    "    public void execute(TridentTuple tuple, TridentCollector collector) {",
+    "      List<Object> _data = tuple.getValues();",
+    ""
+  );
+
+  private final IdentityHashMap<RelNode, Fields> outputFields = new IdentityHashMap<>();
+
+  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+    this.pw = pw;
+    this.typeFactory = typeFactory;
+  }
+
+  @Override
+  public Void visitFilter(Filter filter) throws Exception {
+    beginStage(filter);
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+    String r = filter.getCondition().accept(compiler);
+    pw.print(String.format("    if (%s) { collector.emit(_data); }\n", r));
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void visitProject(Project project) throws Exception {
+    beginStage(project);
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+    int size = project.getChildExps().size();
+    String[] res = new String[size];
+    for (int i = 0; i < size; ++i) {
+      res[i] = project.getChildExps().get(i).accept(compiler);
+    }
+
+    pw.print(String.format("    collector.emit(new Values(%s));\n",
+                           Joiner.on(',').join(res)));
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void defaultValue(RelNode n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Void visitTableScan(TableScan scan) throws Exception {
+    return null;
+  }
+
+  @Override
+  public Void visitTableModify(TableModify modify) throws Exception {
+    return null;
+  }
+
+  private void beginStage(RelNode n) {
+    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
+  }
+
+  private void endStage() {
+    pw.print("  }\n  };\n");
+  }
+
+  static String getStageName(RelNode n) {
+    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
new file mode 100644
index 0000000..a68ba0c
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -0,0 +1,116 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import backtype.storm.Config;
+import backtype.storm.ILocalCluster;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import storm.trident.TridentTopology;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import static org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction.*;
+
+public class TestPlanCompiler {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  @Before
+  public void setUp() {
+    getCollectedValues().clear();
+  }
+
+  @Test
+  public void testCompile() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 2;
+    String sql = "SELECT ID FROM FOO WHERE ID > 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    final AbstractTridentProcessor proc = compiler.compile(state.tree());
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testInsert() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 1;
+    String sql = "INSERT INTO BAR SELECT ID FROM FOO WHERE ID > 3";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    final AbstractTridentProcessor proc = compiler.compile(state.tree());
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    data.put("BAR", new TestUtils.MockSqlTridentDataSource());
+    final TridentTopology topo = proc.build(data);
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(4)}, getCollectedValues().toArray());
+  }
+
+  private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
+                                  TridentTopology topo) throws Exception {
+    final Config conf = new Config();
+    conf.setMaxSpoutPending(20);
+
+    ILocalCluster cluster = new LocalCluster();
+    StormTopology stormTopo = topo.build();
+    try {
+      Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
+      cluster.submitTopology("storm-sql", conf, stormTopo);
+      waitForCompletion(1000 * 1000, new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return getCollectedValues().size() < expectedValueSize;
+        }
+      });
+    } finally {
+      Utils.resetClassLoaderForJavaDeSerialize();
+      cluster.shutdown();
+    }
+  }
+
+  private void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception {
+    long start = TestUtils.monotonicNow();
+    while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
+      Thread.sleep(100);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
index a8ba4dc..eb6580a 100644
--- a/external/sql/storm-sql-runtime/pom.xml
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -60,5 +60,18 @@
     <build>
       <sourceDirectory>src/jvm</sourceDirectory>
       <testSourceDirectory>src/test</testSourceDirectory>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-jar-plugin</artifactId>
+          <executions>
+            <execution>
+              <goals>
+                <goal>test-jar</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
index 4b2a915..d9e1db7 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -19,11 +19,12 @@ package org.apache.storm.sql.runtime;
 
 import storm.trident.operation.Function;
 import storm.trident.spout.IBatchSpout;
+import storm.trident.spout.ITridentDataSource;
 
 /**
  * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
  */
 public interface ISqlTridentDataSource {
-  IBatchSpout getProducer();
+  ITridentDataSource getProducer();
   Function getConsumer();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
new file mode 100644
index 0000000..7faa7e4
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.trident;
+
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import storm.trident.Stream;
+import storm.trident.TridentTopology;
+
+import java.util.Map;
+
+public abstract class AbstractTridentProcessor {
+  protected Stream outputStream;
+  /**
+   * @return the output stream of the SQL
+   */
+  public Stream outputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Construct the trident topology based on the SQL.
+   * @param sources the data sources.
+   */
+  public abstract TridentTopology build(Map<String, ISqlTridentDataSource> sources);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 82347ef..46aac4a 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -19,13 +19,22 @@
  */
 package org.apache.storm.sql;
 
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import org.apache.storm.sql.runtime.ChannelContext;
 import org.apache.storm.sql.runtime.ChannelHandler;
 import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.Function;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+import storm.trident.tuple.TridentTuple;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class TestUtils {
   public static class MockDataSource implements DataSource {
@@ -46,6 +55,80 @@ public class TestUtils {
     }
   }
 
+  public static class MockSqlTridentDataSource implements ISqlTridentDataSource {
+    @Override
+    public IBatchSpout getProducer() {
+      return new MockSpout();
+    }
+
+    @Override
+    public Function getConsumer() {
+      return new CollectDataFunction();
+    }
+
+    public static class CollectDataFunction extends BaseFunction {
+      /**
+       * Collect all values in a static variable as the instance will go through serialization and deserialization.
+       */
+      private transient static final List<List<Object> > VALUES = new ArrayList<>();
+      public static List<List<Object>> getCollectedValues() {
+        return VALUES;
+      }
+
+      @Override
+      public void execute(TridentTuple tuple, TridentCollector collector) {
+        VALUES.add(tuple.getValues());
+      }
+    }
+
+    private static class MockSpout implements IBatchSpout {
+      private final ArrayList<Values> RECORDS = new ArrayList<>();
+      private final Fields OUTPUT_FIELDS = new Fields("ID");
+
+      public MockSpout() {
+        for (int i = 0; i < 5; ++i) {
+          RECORDS.add(new Values(i));
+        }
+      }
+
+      private boolean emitted = false;
+
+      @Override
+      public void open(Map conf, TopologyContext context) {
+      }
+
+      @Override
+      public void emitBatch(long batchId, TridentCollector collector) {
+        if (emitted) {
+          return;
+        }
+
+        for (Values r : RECORDS) {
+          collector.emit(r);
+        }
+        emitted = true;
+      }
+
+      @Override
+      public void ack(long batchId) {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Map<String, Object> getComponentConfiguration() {
+        return null;
+      }
+
+      @Override
+      public Fields getOutputFields() {
+        return OUTPUT_FIELDS;
+      }
+    }
+  }
+
   public static class CollectDataChannelHandler implements ChannelHandler {
     private final List<Values> values;
 
@@ -66,4 +149,9 @@ public class TestUtils {
       throw new RuntimeException(cause);
     }
   }
+
+  public static long monotonicNow() {
+    final long NANOSECONDS_PER_MILLISECOND = 1000000;
+    return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
+  }
 }