You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/08 19:10:00 UTC
[14/20] storm git commit: [StormSQL] STORM-1181. Compile SQLs into
Tridient topology and execute them in LocalCluster.
[StormSQL] STORM-1181. Compile SQLs into Tridient topology and execute them in LocalCluster.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e378c65
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e378c65
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e378c65
Branch: refs/heads/master
Commit: 7e378c65bc18ebcca3e500ea70f6da3f376003c3
Parents: 82f10eb
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 16 14:48:43 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Dec 4 11:09:28 2015 -0800
----------------------------------------------------------------------
.../compiler/backends/trident/PlanCompiler.java | 194 +++++++++++++++++++
.../backends/trident/RelNodeCompiler.java | 116 +++++++++++
.../backends/trident/TestPlanCompiler.java | 116 +++++++++++
external/sql/storm-sql-runtime/pom.xml | 13 ++
.../sql/runtime/ISqlTridentDataSource.java | 3 +-
.../trident/AbstractTridentProcessor.java | 43 ++++
.../test/org/apache/storm/sql/TestUtils.java | 88 +++++++++
7 files changed, 572 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
new file mode 100644
index 0000000..f8bfd12
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
@@ -0,0 +1,194 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class PlanCompiler {
+ private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+ private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+ private static final String PROLOGUE = NEW_LINE_JOINER.join(
+ "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+ "import java.util.List;",
+ "import java.util.Map;",
+ "import backtype.storm.tuple.Fields;",
+ "import backtype.storm.tuple.Values;",
+ "import org.apache.storm.sql.runtime.ISqlTridentDataSource;",
+ "import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;",
+ "import storm.trident.Stream;",
+ "import storm.trident.TridentTopology;",
+ "import storm.trident.fluent.IAggregatableStream;",
+ "import storm.trident.operation.TridentCollector;",
+ "import storm.trident.operation.BaseFunction;",
+ "import storm.trident.spout.IBatchSpout;",
+ "import storm.trident.tuple.TridentTuple;",
+ "",
+ "public final class TridentProcessor extends AbstractTridentProcessor {",
+ "");
+ private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+ " @Override",
+ " public TridentTopology build(Map<String, ISqlTridentDataSource> _sources) {",
+ " TridentTopology topo = new TridentTopology();",
+ ""
+ );
+
+ private final JavaTypeFactory typeFactory;
+ public PlanCompiler(JavaTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ private String generateJavaSource(RelNode root) throws Exception {
+ StringWriter sw = new StringWriter();
+ try (PrintWriter pw = new PrintWriter(sw)) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ printPrologue(pw);
+ compiler.traverse(root);
+ printMain(pw, root);
+ printEpilogue(pw);
+ }
+ return sw.toString();
+ }
+
+ private static class MainFuncCompiler extends PostOrderRelNodeVisitor<Void> {
+ private final PrintWriter pw;
+ private static final String TABLESCAN_TMPL = NEW_LINE_JOINER.join(
+ "if (!_sources.containsKey(%2$s))",
+ " throw new RuntimeException(\"Cannot find table \" + %2$s);",
+ "Stream _%1$s = topo.newStream(%2$s, _sources.get(%2$s).getProducer());",
+ ""
+ );
+
+ private static final String TABLEMODIFY_TMPL = NEW_LINE_JOINER.join(
+ "Stream _%1$s = _%3$s.each(new Fields(%4$s), _sources.get(%2$s).getConsumer(), new Fields(%5$s));",
+ ""
+ );
+ private static final String TRANSFORMATION_TMPL = NEW_LINE_JOINER.join(
+ "Stream _%1$s = _%2$s.each(new Fields(%3$s), %1$s, new Fields(%4$s)).toStream().project(new Fields(%4$s));",
+ ""
+ );
+
+ private MainFuncCompiler(PrintWriter pw) {
+ this.pw = pw;
+ }
+
+ @Override
+ public Void defaultValue(RelNode n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Void visitFilter(Filter filter) throws Exception {
+ visitTransformation(filter);
+ return null;
+ }
+
+ @Override
+ public Void visitTableModify(TableModify modify) throws Exception {
+ Preconditions.checkArgument(modify.isInsert(), "Only INSERT statement is supported.");
+ String name = RelNodeCompiler.getStageName(modify);
+ RelNode input = modify.getInput();
+ String inputName = RelNodeCompiler.getStageName(input);
+ pw.print(String.format(TABLEMODIFY_TMPL, name, CompilerUtil.escapeJavaString(
+ Joiner.on('.').join(modify.getTable().getQualifiedName()), true),
+ inputName, getFieldString(input), getFieldString(modify)));
+ return null;
+ }
+
+ @Override
+ public Void visitTableScan(TableScan scan) throws Exception {
+ String name = RelNodeCompiler.getStageName(scan);
+ pw.print(String.format(TABLESCAN_TMPL, name, CompilerUtil.escapeJavaString(
+ Joiner.on('.').join(scan.getTable().getQualifiedName()), true)));
+ return null;
+ }
+
+ @Override
+ public Void visitProject(Project project) throws Exception {
+ visitTransformation(project);
+ return null;
+ }
+
+ private static String getFieldString(RelNode n) {
+ int id = n.getId();
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (String f: n.getRowType().getFieldNames()) {
+ if (!first) {
+ sb.append(", ");
+ }
+ if (n instanceof TableScan) {
+ sb.append(CompilerUtil.escapeJavaString(f, true));
+ } else {
+ sb.append(CompilerUtil.escapeJavaString(String.format("%d_%s", id, f), true));
+ }
+ first = false;
+ }
+ return sb.toString();
+ }
+
+ private void visitTransformation(SingleRel node) {
+ String name = RelNodeCompiler.getStageName(node);
+ RelNode input = node.getInput();
+ String inputName = RelNodeCompiler.getStageName(input);
+ pw.print(String.format(TRANSFORMATION_TMPL, name, inputName,
+ getFieldString(input), getFieldString(node)));
+ }
+ }
+
+ private void printMain(PrintWriter pw, RelNode root) throws Exception {
+ pw.print(INITIALIZER_PROLOGUE);
+ MainFuncCompiler compiler = new MainFuncCompiler(pw);
+ compiler.traverse(root);
+ pw.print(String.format(" this.outputStream = _%s;\n", RelNodeCompiler.getStageName(root)));
+ pw.print(" return topo; \n}\n");
+ }
+
+ public AbstractTridentProcessor compile(RelNode plan) throws Exception {
+ String javaCode = generateJavaSource(plan);
+ ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+ PACKAGE_NAME + ".TridentProcessor",
+ javaCode, null);
+ return (AbstractTridentProcessor) cl.loadClass(PACKAGE_NAME + ".TridentProcessor").newInstance();
+ }
+
+ private static void printEpilogue(
+ PrintWriter pw) throws Exception {
+ pw.print("}\n");
+ }
+
+ private static void printPrologue(PrintWriter pw) {
+ pw.append(PROLOGUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
new file mode 100644
index 0000000..1de39d3
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
@@ -0,0 +1,116 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+
+import java.io.PrintWriter;
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+ public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
+ private final PrintWriter pw;
+ private final JavaTypeFactory typeFactory;
+ private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+ " private static final BaseFunction %1$s = ",
+ " new BaseFunction() {",
+ " @Override",
+ " public void execute(TridentTuple tuple, TridentCollector collector) {",
+ " List<Object> _data = tuple.getValues();",
+ ""
+ );
+
+ private final IdentityHashMap<RelNode, Fields> outputFields = new IdentityHashMap<>();
+
+ RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+ this.pw = pw;
+ this.typeFactory = typeFactory;
+ }
+
+ @Override
+ public Void visitFilter(Filter filter) throws Exception {
+ beginStage(filter);
+ ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+ String r = filter.getCondition().accept(compiler);
+ pw.print(String.format(" if (%s) { collector.emit(_data); }\n", r));
+ endStage();
+ return null;
+ }
+
+ @Override
+ public Void visitProject(Project project) throws Exception {
+ beginStage(project);
+ ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+ int size = project.getChildExps().size();
+ String[] res = new String[size];
+ for (int i = 0; i < size; ++i) {
+ res[i] = project.getChildExps().get(i).accept(compiler);
+ }
+
+ pw.print(String.format(" collector.emit(new Values(%s));\n",
+ Joiner.on(',').join(res)));
+ endStage();
+ return null;
+ }
+
+ @Override
+ public Void defaultValue(RelNode n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Void visitTableScan(TableScan scan) throws Exception {
+ return null;
+ }
+
+ @Override
+ public Void visitTableModify(TableModify modify) throws Exception {
+ return null;
+ }
+
+ private void beginStage(RelNode n) {
+ pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
+ }
+
+ private void endStage() {
+ pw.print(" }\n };\n");
+ }
+
+ static String getStageName(RelNode n) {
+ return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
new file mode 100644
index 0000000..a68ba0c
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -0,0 +1,116 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import backtype.storm.Config;
+import backtype.storm.ILocalCluster;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import storm.trident.TridentTopology;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import static org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction.*;
+
+public class TestPlanCompiler {
+ private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ @Before
+ public void setUp() {
+ getCollectedValues().clear();
+ }
+
+ @Test
+ public void testCompile() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 2;
+ String sql = "SELECT ID FROM FOO WHERE ID > 2";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ final AbstractTridentProcessor proc = compiler.compile(state.tree());
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+ final TridentTopology topo = proc.build(data);
+ Fields f = proc.outputStream().getOutputFields();
+ proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray());
+ }
+
+ @Test
+ public void testInsert() throws Exception {
+ final int EXPECTED_VALUE_SIZE = 1;
+ String sql = "INSERT INTO BAR SELECT ID FROM FOO WHERE ID > 3";
+ TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+ PlanCompiler compiler = new PlanCompiler(typeFactory);
+ final AbstractTridentProcessor proc = compiler.compile(state.tree());
+ final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+ data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+ data.put("BAR", new TestUtils.MockSqlTridentDataSource());
+ final TridentTopology topo = proc.build(data);
+ runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+ Assert.assertArrayEquals(new Values[] { new Values(4)}, getCollectedValues().toArray());
+ }
+
+ private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
+ TridentTopology topo) throws Exception {
+ final Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+
+ ILocalCluster cluster = new LocalCluster();
+ StormTopology stormTopo = topo.build();
+ try {
+ Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
+ cluster.submitTopology("storm-sql", conf, stormTopo);
+ waitForCompletion(1000 * 1000, new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return getCollectedValues().size() < expectedValueSize;
+ }
+ });
+ } finally {
+ Utils.resetClassLoaderForJavaDeSerialize();
+ cluster.shutdown();
+ }
+ }
+
+ private void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception {
+ long start = TestUtils.monotonicNow();
+ while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
+ Thread.sleep(100);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
index a8ba4dc..eb6580a 100644
--- a/external/sql/storm-sql-runtime/pom.xml
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -60,5 +60,18 @@
<build>
<sourceDirectory>src/jvm</sourceDirectory>
<testSourceDirectory>src/test</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
index 4b2a915..d9e1db7 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -19,11 +19,12 @@ package org.apache.storm.sql.runtime;
import storm.trident.operation.Function;
import storm.trident.spout.IBatchSpout;
+import storm.trident.spout.ITridentDataSource;
/**
* A ISqlTridentDataSource specifies how an external data source produces and consumes data.
*/
public interface ISqlTridentDataSource {
- IBatchSpout getProducer();
+ ITridentDataSource getProducer();
Function getConsumer();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
new file mode 100644
index 0000000..7faa7e4
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * * <p>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.trident;
+
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import storm.trident.Stream;
+import storm.trident.TridentTopology;
+
+import java.util.Map;
+
+public abstract class AbstractTridentProcessor {
+ protected Stream outputStream;
+ /**
+ * @return the output stream of the SQL
+ */
+ public Stream outputStream() {
+ return outputStream;
+ }
+
+ /**
+ * Construct the trident topology based on the SQL.
+ * @param sources the data sources.
+ */
+ public abstract TridentTopology build(Map<String, ISqlTridentDataSource> sources);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7e378c65/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 82347ef..46aac4a 100644
--- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -19,13 +19,22 @@
*/
package org.apache.storm.sql;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.apache.storm.sql.runtime.ChannelContext;
import org.apache.storm.sql.runtime.ChannelHandler;
import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.Function;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+import storm.trident.tuple.TridentTuple;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
public class TestUtils {
public static class MockDataSource implements DataSource {
@@ -46,6 +55,80 @@ public class TestUtils {
}
}
+ public static class MockSqlTridentDataSource implements ISqlTridentDataSource {
+ @Override
+ public IBatchSpout getProducer() {
+ return new MockSpout();
+ }
+
+ @Override
+ public Function getConsumer() {
+ return new CollectDataFunction();
+ }
+
+ public static class CollectDataFunction extends BaseFunction {
+ /**
+ * Collect all values in a static variable as the instance will go through serialization and deserialization.
+ */
+ private transient static final List<List<Object> > VALUES = new ArrayList<>();
+ public static List<List<Object>> getCollectedValues() {
+ return VALUES;
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ VALUES.add(tuple.getValues());
+ }
+ }
+
+ private static class MockSpout implements IBatchSpout {
+ private final ArrayList<Values> RECORDS = new ArrayList<>();
+ private final Fields OUTPUT_FIELDS = new Fields("ID");
+
+ public MockSpout() {
+ for (int i = 0; i < 5; ++i) {
+ RECORDS.add(new Values(i));
+ }
+ }
+
+ private boolean emitted = false;
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ if (emitted) {
+ return;
+ }
+
+ for (Values r : RECORDS) {
+ collector.emit(r);
+ }
+ emitted = true;
+ }
+
+ @Override
+ public void ack(long batchId) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return OUTPUT_FIELDS;
+ }
+ }
+ }
+
public static class CollectDataChannelHandler implements ChannelHandler {
private final List<Values> values;
@@ -66,4 +149,9 @@ public class TestUtils {
throw new RuntimeException(cause);
}
}
+
+ public static long monotonicNow() {
+ final long NANOSECONDS_PER_MILLISECOND = 1000000;
+ return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
+ }
}