You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/27 18:08:12 UTC
[8/9] storm git commit: Implement compiler for relation nodes.
Implement compiler for relation nodes.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54723fac
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54723fac
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54723fac
Branch: refs/heads/STORM-1040
Commit: 54723fac866fcc128c27427d64661f06275668e7
Parents: 3dde1d6
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Oct 22 16:45:20 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Oct 26 10:04:16 2015 -0700
----------------------------------------------------------------------
.../storm/sql/compiler/RelNodeCompiler.java | 107 +++++++++++++++++++
.../storm/sql/compiler/TestRelNodeCompiler.java | 70 ++++++++++++
2 files changed, 177 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/54723fac/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
new file mode 100644
index 0000000..0550035
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+ private final PrintWriter pw;
+ private final JavaTypeFactory typeFactory;
+
+ public Set<String> getReferredTables() {
+ return Collections.unmodifiableSet(referredTables);
+ }
+
+ private final Set<String> referredTables = new TreeSet<>();
+
+ RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+ this.pw = pw;
+ this.typeFactory = typeFactory;
+ }
+
+ @Override
+ Void visitFilter(Filter filter) throws Exception {
+ beginFunction(filter);
+ pw.print(" if (_data == null) return null;\n");
+ ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+ String r = filter.getCondition().accept(compiler);
+ pw.print(String.format(" return %s ? _data : null;\n", r));
+ endFunction();
+ return null;
+ }
+
+ @Override
+ Void visitProject(Project project) throws Exception {
+ beginFunction(project);
+ pw.print(" if (_data == null) return null;\n");
+ ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+ int size = project.getChildExps().size();
+ String[] res = new String[size];
+ for (int i = 0; i < size; ++i) {
+ res[i] = project.getChildExps().get(i).accept(compiler);
+ }
+
+ pw.print(String.format(" return new Values(%s);\n", Joiner.on(',').join
+ (res)));
+ endFunction();
+ return null;
+ }
+
+ @Override
+ Void defaultValue(RelNode n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ Void visitTableScan(TableScan scan) throws Exception {
+ String tableName = Joiner.on('_').join(scan.getTable().getQualifiedName());
+ referredTables.add(tableName);
+ beginFunction(scan);
+ pw.print(String.format(" return _datasources[TABLE_%s].next();\n",
+ tableName));
+ endFunction();
+ return null;
+ }
+
+ private void beginFunction(RelNode n) {
+ pw.print(String.format("private Values %s(%s) {\n", getFunctionName(n), n
+ .getInputs().isEmpty() ? "" : "Values _data"));
+ }
+
+ private void endFunction() {
+ pw.print("}\n");
+ }
+
+ static String getFunctionName(RelNode n) {
+ return n.getClass().getSimpleName() + "_" + n.getId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/54723fac/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
new file mode 100644
index 0000000..61f5409
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class TestRelNodeCompiler {
+ @Test
+ public void testFilter() throws Exception {
+ String sql = "SELECT ID + 1 FROM FOO WHERE ID > 3";
+ TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+ JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+ LogicalProject project = (LogicalProject) state.tree;
+ LogicalFilter filter = (LogicalFilter) project.getInput();
+ TableScan scan = (TableScan) filter.getInput();
+
+ try (StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw)
+ ) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ compiler.visitTableScan(scan);
+ pw.flush();
+ Assert.assertTrue(sw.toString().contains("_datasources[TABLE_FOO]"));
+ }
+
+ try (StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw)
+ ) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ compiler.visitFilter(filter);
+ pw.flush();
+ Assert.assertTrue(sw.toString().contains("t0 > 3"));
+ }
+
+ try (StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw)
+ ) {
+ RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+ compiler.visitProject(project);
+ pw.flush();
+ Assert.assertTrue(sw.toString().contains("t0 + 1"));
+ }
+ }
+}