You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/27 18:08:12 UTC

[8/9] storm git commit: Implement compiler for relation nodes.

Implement compiler for relation nodes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54723fac
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54723fac
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54723fac

Branch: refs/heads/STORM-1040
Commit: 54723fac866fcc128c27427d64661f06275668e7
Parents: 3dde1d6
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Oct 22 16:45:20 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Oct 26 10:04:16 2015 -0700

----------------------------------------------------------------------
 .../storm/sql/compiler/RelNodeCompiler.java     | 107 +++++++++++++++++++
 .../storm/sql/compiler/TestRelNodeCompiler.java |  70 ++++++++++++
 2 files changed, 177 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/54723fac/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
new file mode 100644
index 0000000..0550035
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+  private final PrintWriter pw;
+  private final JavaTypeFactory typeFactory;
+
+  public Set<String> getReferredTables() {
+    return Collections.unmodifiableSet(referredTables);
+  }
+
+  private final Set<String> referredTables = new TreeSet<>();
+
+  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+    this.pw = pw;
+    this.typeFactory = typeFactory;
+  }
+
+  @Override
+  Void visitFilter(Filter filter) throws Exception {
+    beginFunction(filter);
+    pw.print("  if (_data == null) return null;\n");
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+    String r = filter.getCondition().accept(compiler);
+    pw.print(String.format("  return %s ? _data : null;\n", r));
+    endFunction();
+    return null;
+  }
+
+  @Override
+  Void visitProject(Project project) throws Exception {
+    beginFunction(project);
+    pw.print("  if (_data == null) return null;\n");
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+    int size = project.getChildExps().size();
+    String[] res = new String[size];
+    for (int i = 0; i < size; ++i) {
+      res[i] = project.getChildExps().get(i).accept(compiler);
+    }
+
+    pw.print(String.format("  return new Values(%s);\n", Joiner.on(',').join
+        (res)));
+    endFunction();
+    return null;
+  }
+
+  @Override
+  Void defaultValue(RelNode n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  Void visitTableScan(TableScan scan) throws Exception {
+    String tableName = Joiner.on('_').join(scan.getTable().getQualifiedName());
+    referredTables.add(tableName);
+    beginFunction(scan);
+    pw.print(String.format("  return _datasources[TABLE_%s].next();\n",
+                           tableName));
+    endFunction();
+    return null;
+  }
+
+  private void beginFunction(RelNode n) {
+    pw.print(String.format("private Values %s(%s) {\n", getFunctionName(n), n
+        .getInputs().isEmpty() ? "" : "Values _data"));
+  }
+
+  private void endFunction() {
+    pw.print("}\n");
+  }
+
+  static String getFunctionName(RelNode n) {
+    return n.getClass().getSimpleName() + "_" + n.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/54723fac/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
new file mode 100644
index 0000000..61f5409
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class TestRelNodeCompiler {
+  @Test
+  public void testFilter() throws Exception {
+    String sql = "SELECT ID + 1 FROM FOO WHERE ID > 3";
+    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+        RelDataTypeSystem.DEFAULT);
+    LogicalProject project = (LogicalProject) state.tree;
+    LogicalFilter filter = (LogicalFilter) project.getInput();
+    TableScan scan = (TableScan) filter.getInput();
+
+    try (StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw)
+    ) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      compiler.visitTableScan(scan);
+      pw.flush();
+      Assert.assertTrue(sw.toString().contains("_datasources[TABLE_FOO]"));
+    }
+
+    try (StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw)
+    ) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      compiler.visitFilter(filter);
+      pw.flush();
+      Assert.assertTrue(sw.toString().contains("t0 > 3"));
+    }
+
+    try (StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw)
+    ) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      compiler.visitProject(project);
+      pw.flush();
+      Assert.assertTrue(sw.toString().contains("t0 + 1"));
+    }
+  }
+}