You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/10 19:39:50 UTC

[1/3] storm git commit: Implement IS operators in StormSQL.

Repository: storm
Updated Branches:
  refs/heads/STORM-1040 8dfd4a823 -> 1d0155aa8


Implement IS operators in StormSQL.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/183b5ac9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/183b5ac9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/183b5ac9

Branch: refs/heads/STORM-1040
Commit: 183b5ac926ad25b1c504c8822ebbdd236ecd0963
Parents: 8dfd4a8
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Nov 3 15:06:22 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Nov 3 15:28:33 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/sql/compiler/ExprCompiler.java | 62 +++++++++++++++
 .../storm/sql/compiler/TestExprSemantic.java    | 82 ++++++++++++++++++++
 2 files changed, 144 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/183b5ac9/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index 0d35593..fd3a614 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -18,6 +18,7 @@
 package org.apache.storm.sql.compiler;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -47,6 +48,12 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.DIVIDE_INTEGER;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_FALSE;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_FALSE;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_NULL;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NOT_TRUE;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_NULL;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_TRUE;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.LESS_THAN_OR_EQUAL;
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.MINUS;
@@ -189,6 +196,12 @@ class ExprCompiler implements RexVisitor<String> {
           .put(infixBinary(MULTIPLY, "*"))
           .put(infixBinary(DIVIDE, "/"))
           .put(infixBinary(DIVIDE_INTEGER, "/"))
+          .put(expect(IS_NULL, null))
+          .put(expectNot(IS_NOT_NULL, null))
+          .put(expect(IS_TRUE, true))
+          .put(expectNot(IS_NOT_TRUE, true))
+          .put(expect(IS_FALSE, false))
+          .put(expectNot(IS_NOT_FALSE, false))
           .put(AND, AND_EXPR)
           .put(OR, OR_EXPR)
           .put(NOT, NOT_EXPR);
@@ -223,6 +236,55 @@ class ExprCompiler implements RexVisitor<String> {
       return new AbstractMap.SimpleImmutableEntry<>(op, trans);
     }
 
+    private Map.Entry<SqlOperator, CallExprPrinter> expect(
+        SqlOperator op, final Boolean expect) {
+      return expect0(op, expect, false);
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> expectNot(
+        SqlOperator op, final Boolean expect) {
+      return expect0(op, expect, true);
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> expect0(
+        SqlOperator op, final Boolean expect, final boolean negate) {
+      CallExprPrinter trans = new CallExprPrinter() {
+        @Override
+        public String translate(
+            ExprCompiler compiler, RexCall call) {
+          assert call.getOperands().size() == 1;
+          String val = compiler.reserveName(call);
+          RexNode operand = call.getOperands().get(0);
+          boolean nullable = operand.getType().isNullable();
+          String op = operand.accept(compiler);
+          PrintWriter pw = compiler.pw;
+          if (!nullable) {
+            if (expect == null) {
+              pw.print(String.format("boolean %s = %b;\n", val, !negate));
+            } else {
+              pw.print(String.format("boolean %s = %s == %b;\n", val, op,
+                                     expect ^ negate));
+            }
+          } else {
+            String expr;
+            if (expect == null) {
+              expr = String.format("%s == null", op);
+            } else {
+              expr = String.format("%s == Boolean.%s", op, expect ? "TRUE" :
+                  "FALSE");
+            }
+            if (negate) {
+              expr = String.format("!(%s)", expr);
+            }
+            pw.print(String.format("boolean %s = %s;\n", val, expr));
+          }
+          return val;
+        }
+      };
+      return new AbstractMap.SimpleImmutableEntry<>(op, trans);
+    }
+
+
     private static final CallExprPrinter AND_EXPR = new CallExprPrinter() {
       @Override
       public String translate(

http://git-wip-us.apache.org/repos/asf/storm/blob/183b5ac9/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
new file mode 100644
index 0000000..bda9e00
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import backtype.storm.tuple.Values;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.storm.ChannelHandler;
+import org.apache.storm.sql.storm.DataSource;
+import org.apache.storm.sql.storm.runtime.AbstractValuesProcessor;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExprSemantic {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  @Test
+  public void testLogicalExpr() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList("ID > 0 OR ID < 1", "ID > 0 AND ID < 1",
+                           "NOT (ID > 0 AND ID < 1)"),
+        "WHERE ID > 0 AND ID < 2");
+    assertEquals(new Values(true, false, true), v);
+  }
+
+  @Test
+  public void testExpectOperator() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList("TRUE IS TRUE", "TRUE IS NOT TRUE",
+                           "UNKNOWN IS TRUE", "UNKNOWN IS NOT TRUE",
+                           "TRUE IS FALSE", "UNKNOWN IS NULL",
+                           "UNKNOWN IS NOT NULL"));
+    assertEquals(new Values(true, false, false, true, false, true, false), v);
+  }
+
+  private Values testExpr(List<String> exprs, String additionalCaluse)
+      throws Exception {
+    String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO";
+    if (additionalCaluse != null) {
+      sql += " " + additionalCaluse;
+    }
+    TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree);
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    return values.get(0);
+  }
+
+  private Values testExpr(List<String> exprs) throws Exception {
+    return testExpr(exprs, null);
+  }
+
+}


[3/3] storm git commit: STORM-1159. Support nullable operations in StormSQL.

Posted by sr...@apache.org.
STORM-1159. Support nullable operations in StormSQL.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1d0155aa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1d0155aa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1d0155aa

Branch: refs/heads/STORM-1040
Commit: 1d0155aa8b9d9cb2a0ccb364252c22a30b28c66f
Parents: 8890de3
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Nov 4 18:36:59 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Nov 5 09:00:34 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/sql/compiler/ExprCompiler.java | 54 ++++++++++++++------
 .../storm/sql/compiler/TestExprCompiler.java    | 10 ++--
 .../storm/sql/compiler/TestExprSemantic.java    |  9 ++++
 .../storm/sql/compiler/TestRelNodeCompiler.java |  7 ++-
 4 files changed, 59 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1d0155aa/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index db97d43..6617ef6 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -17,10 +17,7 @@
  */
 package org.apache.storm.sql.compiler;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
-import org.apache.calcite.adapter.enumerable.CallImplementor;
-import org.apache.calcite.adapter.enumerable.RexImpTable;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -42,7 +39,6 @@ import java.io.PrintWriter;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.util.AbstractMap;
-import java.util.IdentityHashMap;
 import java.util.Map;
 
 import static org.apache.calcite.sql.fun.SqlStdOperatorTable.AND;
@@ -80,7 +76,7 @@ class ExprCompiler implements RexVisitor<String> {
 
   @Override
   public String visitInputRef(RexInputRef rexInputRef) {
-    String name = reserveName(rexInputRef);
+    String name = reserveName();
     String typeName = javaTypeName(rexInputRef);
     pw.print(String.format("%s %s = (%s)(_data.get(%d));\n", typeName, name,
                            typeName, rexInputRef.getIndex()));
@@ -164,9 +160,8 @@ class ExprCompiler implements RexVisitor<String> {
     return ((Class<?>)ty).getCanonicalName();
   }
 
-  private String reserveName(RexNode node) {
-    String name = "t" + ++nameCount;
-    return name;
+  private String reserveName() {
+    return "t" + ++nameCount;
   }
 
   private interface CallExprPrinter {
@@ -223,11 +218,38 @@ class ExprCompiler implements RexVisitor<String> {
             ExprCompiler compiler, RexCall call) {
           int size = call.getOperands().size();
           assert size == 2;
-          String[] ops = new String[size];
-          for (int i = 0; i < size; ++i) {
-            ops[i] = call.getOperands().get(i).accept(compiler);
+          String val = compiler.reserveName();
+          RexNode op0 = call.getOperands().get(0);
+          RexNode op1 = call.getOperands().get(1);
+          boolean lhsNullable = op0.getType().isNullable();
+          boolean rhsNullable = op1.getType().isNullable();
+
+          PrintWriter pw = compiler.pw;
+          String lhs = op0.accept(compiler);
+          pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val));
+          if (!lhsNullable) {
+            String rhs = op1.accept(compiler);
+            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", rhs);
+            if (!rhsNullable) {
+              pw.print(String.format("%s = %s;\n", val, calc));
+            } else {
+              pw.print(
+                  String.format("%s = %s == null ? null : (%s);\n",
+                      val, rhs, calc));
+            }
+          } else {
+            pw.print(String.format("if (%2$s == null) { %1$s = null; }\n",
+                val, lhs));
+            pw.print("else {\n");
+            String rhs = op1.accept(compiler);
+            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", lhs);
+            if (!rhsNullable) {
+              pw.print(String.format("%s = %s;\n}\n", val, calc));
+            } else {
+              pw.print(String.format("%1$s = %2$s == null ? null : (%3$s);\n}\n", val, rhs, calc));
+            }
           }
-          return String.format("%s %s %s", ops[0], javaOperator, ops[1]);
+          return val;
         }
       };
       return new AbstractMap.SimpleImmutableEntry<>(op, trans);
@@ -250,7 +272,7 @@ class ExprCompiler implements RexVisitor<String> {
         public String translate(
             ExprCompiler compiler, RexCall call) {
           assert call.getOperands().size() == 1;
-          String val = compiler.reserveName(call);
+          String val = compiler.reserveName();
           RexNode operand = call.getOperands().get(0);
           boolean nullable = operand.getType().isNullable();
           String op = operand.accept(compiler);
@@ -289,7 +311,7 @@ class ExprCompiler implements RexVisitor<String> {
       @Override
       public String translate(
           ExprCompiler compiler, RexCall call) {
-        String val = compiler.reserveName(call);
+        String val = compiler.reserveName();
         PrintWriter pw = compiler.pw;
         pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
                                val));
@@ -331,7 +353,7 @@ class ExprCompiler implements RexVisitor<String> {
       @Override
       public String translate(
           ExprCompiler compiler, RexCall call) {
-        String val = compiler.reserveName(call);
+        String val = compiler.reserveName();
         PrintWriter pw = compiler.pw;
         pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
                                val));
@@ -369,7 +391,7 @@ class ExprCompiler implements RexVisitor<String> {
       @Override
       public String translate(
           ExprCompiler compiler, RexCall call) {
-        String val = compiler.reserveName(call);
+        String val = compiler.reserveName();
         PrintWriter pw = compiler.pw;
         RexNode op = call.getOperands().get(0);
         String lhs = op.accept(compiler);

http://git-wip-us.apache.org/repos/asf/storm/blob/1d0155aa/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
index 9f516d4..f03cff8 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
@@ -28,8 +28,10 @@ import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 public class TestExprCompiler {
   @Test
@@ -62,7 +64,7 @@ public class TestExprCompiler {
       project.getChildExps().get(0).accept(compiler);
     }
 
-    assertEquals("int t0 = (int)(_data.get(0));\n", sw.toString());
+    assertThat(sw.toString(), containsString("(int)(_data.get(0));"));
   }
 
   @Test
@@ -83,7 +85,9 @@ public class TestExprCompiler {
         res[i] = project.getChildExps().get(i).accept(compiler);
       }
     }
-    assertArrayEquals(new String[]{"1 > 2", "3 + 5", "1 - 1.0E0", "3 + t0"},
-                      res);
+    assertThat(sw.get(0).toString(), containsString("1 > 2"));
+    assertThat(sw.get(1).toString(), containsString("3 + 5"));
+    assertThat(sw.get(2).toString(), containsString("1 - 1.0E0"));
+    assertThat(sw.get(3).toString(), containsString("3 +"));
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1d0155aa/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 7bde092..8454d7e 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -58,6 +58,15 @@ public class TestExprSemantic {
   }
 
   @Test
+  public void testArithmeticWithNull() throws Exception {
+    Values v = testExpr(
+      Lists.newArrayList(
+          "1 + CAST(NULL AS INT)", "CAST(NULL AS INT) + 1", "CAST(NULL AS INT) + CAST(NULL AS INT)", "1 + 2"
+      ));
+    assertEquals(new Values(null, null, null, 3), v);
+  }
+
+  @Test
   public void testNotWithNull() throws Exception {
     Values v = testExpr(
         Lists.newArrayList(

http://git-wip-us.apache.org/repos/asf/storm/blob/1d0155aa/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
index cedb48b..d820f22 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
@@ -23,12 +23,15 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import static org.hamcrest.CoreMatchers.*;
+
 public class TestRelNodeCompiler {
   @Test
   public void testFilter() throws Exception {
@@ -45,7 +48,7 @@ public class TestRelNodeCompiler {
       RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
       compiler.visitFilter(filter);
       pw.flush();
-      Assert.assertTrue(sw.toString().contains("t0 > 3"));
+      Assert.assertThat(sw.toString(), containsString("> 3"));
     }
 
     try (StringWriter sw = new StringWriter();
@@ -54,7 +57,7 @@ public class TestRelNodeCompiler {
       RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
       compiler.visitProject(project);
       pw.flush();
-      Assert.assertTrue(sw.toString().contains("t0 + 1"));
+      Assert.assertThat(sw.toString(), containsString("+ 1"));
     }
   }
 }


[2/3] storm git commit: Implement nullable semantics for AND, OR and NOT operators.

Posted by sr...@apache.org.
Implement nullable semantics for AND, OR and NOT operators.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8890de3e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8890de3e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8890de3e

Branch: refs/heads/STORM-1040
Commit: 8890de3e098b759d0a2938e06b4841d371066b7e
Parents: 183b5ac
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Nov 4 09:24:32 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Wed Nov 4 09:24:32 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/sql/compiler/ExprCompiler.java | 101 +++++++++++++++----
 .../storm/sql/compiler/TestExprSemantic.java    |  49 ++++++---
 2 files changed, 119 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8890de3e/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
index fd3a614..db97d43 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -17,7 +17,9 @@
  */
 package org.apache.storm.sql.compiler;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
 import org.apache.calcite.adapter.enumerable.RexImpTable;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.type.RelDataType;
@@ -67,9 +69,9 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.PLUS;
  */
 class ExprCompiler implements RexVisitor<String> {
   private final PrintWriter pw;
-  private final Map<RexNode, String> expr = new IdentityHashMap<>();
   private final JavaTypeFactory typeFactory;
   private static final ImpTable IMP_TABLE = new ImpTable();
+  private int nameCount;
 
   ExprCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
     this.pw = pw;
@@ -78,14 +80,10 @@ class ExprCompiler implements RexVisitor<String> {
 
   @Override
   public String visitInputRef(RexInputRef rexInputRef) {
-    if (expr.containsKey(rexInputRef)) {
-      return expr.get(rexInputRef);
-    }
     String name = reserveName(rexInputRef);
     String typeName = javaTypeName(rexInputRef);
     pw.print(String.format("%s %s = (%s)(_data.get(%d));\n", typeName, name,
                            typeName, rexInputRef.getIndex()));
-    expr.put(rexInputRef, name);
     return name;
   }
 
@@ -167,8 +165,7 @@ class ExprCompiler implements RexVisitor<String> {
   }
 
   private String reserveName(RexNode node) {
-    String name = "t" + expr.size();
-    expr.put(node, name);
+    String name = "t" + ++nameCount;
     return name;
   }
 
@@ -285,6 +282,9 @@ class ExprCompiler implements RexVisitor<String> {
     }
 
 
+    // If any of the arguments are false, result is false;
+    // else if any arguments are null, result is null;
+    // else true.
     private static final CallExprPrinter AND_EXPR = new CallExprPrinter() {
       @Override
       public String translate(
@@ -293,15 +293,40 @@ class ExprCompiler implements RexVisitor<String> {
         PrintWriter pw = compiler.pw;
         pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
                                val));
-        String lhs = call.getOperands().get(0).accept(compiler);
-        pw.print(String.format("if (!(%2$s)) { %1$s = false; }\n", val, lhs));
-        pw.print("else {\n");
-        String rhs = call.getOperands().get(1).accept(compiler);
-        pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
+        RexNode op0 = call.getOperands().get(0);
+        RexNode op1 = call.getOperands().get(1);
+        boolean lhsNullable = op0.getType().isNullable();
+        boolean rhsNullable = op1.getType().isNullable();
+        String lhs = op0.accept(compiler);
+        if (!lhsNullable) {
+          pw.print(String.format("if (!(%2$s)) { %1$s = false; }\n", val, lhs));
+          pw.print("else {\n");
+          String rhs = op1.accept(compiler);
+          pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
+        } else {
+          String foldedLHS = foldNullExpr(
+              String.format("%1$s == null || %1$s", lhs), "true", lhs);
+          pw.print(String.format("if (%s) {\n", foldedLHS));
+          String rhs = op1.accept(compiler);
+          String s;
+          if (rhsNullable) {
+            s = foldNullExpr(
+                String.format("(%2$s != null && !(%2$s)) ? false : %1$s", lhs,
+                              rhs),
+                "null", rhs);
+          } else {
+            s = String.format("!(%2$s) ? false : %1$s", lhs, rhs);
+          }
+          pw.print(String.format("  %1$s = %2$s;\n", val, s));
+          pw.print(String.format("} else { %1$s = false; }\n", val));
+        }
         return val;
       }
     };
 
+    // If any of the arguments are true, result is true;
+    // else if any arguments are null, result is null;
+    // else false.
     private static final CallExprPrinter OR_EXPR = new CallExprPrinter() {
       @Override
       public String translate(
@@ -310,11 +335,32 @@ class ExprCompiler implements RexVisitor<String> {
         PrintWriter pw = compiler.pw;
         pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
                                val));
-        String lhs = call.getOperands().get(0).accept(compiler);
-        pw.print(String.format("if (%2$s) { %1$s = true; }\n", val, lhs));
-        pw.print("else {\n");
-        String rhs = call.getOperands().get(1).accept(compiler);
-        pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
+        RexNode op0 = call.getOperands().get(0);
+        RexNode op1 = call.getOperands().get(1);
+        boolean lhsNullable = op0.getType().isNullable();
+        boolean rhsNullable = op1.getType().isNullable();
+        String lhs = op0.accept(compiler);
+        if (!lhsNullable) {
+          pw.print(String.format("if (%2$s) { %1$s = true; }\n", val, lhs));
+          pw.print("else {\n");
+          String rhs = op1.accept(compiler);
+          pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
+        } else {
+          String foldedLHS = foldNullExpr(
+              String.format("%1$s == null || !(%1$s)", lhs), "true", lhs);
+          pw.print(String.format("if (%s) {\n", foldedLHS));
+          String rhs = op1.accept(compiler);
+          String s;
+          if (rhsNullable) {
+            s = foldNullExpr(
+                String.format("(%2$s != null && %2$s) ? true : %1$s", lhs, rhs),
+                "null", rhs);
+          } else {
+            s = String.format("%2$s ? %2$s : %1$s", lhs, rhs);
+          }
+          pw.print(String.format("  %1$s = %2$s;\n", val, s));
+          pw.print(String.format("} else { %1$s = true; }\n", val));
+        }
         return val;
       }
     };
@@ -325,13 +371,30 @@ class ExprCompiler implements RexVisitor<String> {
           ExprCompiler compiler, RexCall call) {
         String val = compiler.reserveName(call);
         PrintWriter pw = compiler.pw;
-        String lhs = call.getOperands().get(0).accept(compiler);
+        RexNode op = call.getOperands().get(0);
+        String lhs = op.accept(compiler);
+        boolean nullable = call.getType().isNullable();
         pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
                                val));
-        pw.print(String.format("%1$s = !(%2$s);\n", val, lhs));
+        if (!nullable) {
+          pw.print(String.format("%1$s = !(%2$s);\n", val, lhs));
+        } else {
+          String s = foldNullExpr(
+              String.format("%1$s == null ? null : !(%1$s)", lhs), "null", lhs);
+          pw.print(String.format("%1$s = %2$s;\n", val, s));
+        }
         return val;
       }
     };
+
+    private static String foldNullExpr(String notNullExpr, String
+        nullExpr, String op) {
+      if (op.equals("null")) {
+        return nullExpr;
+      } else {
+        return notNullExpr;
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8890de3e/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index bda9e00..7bde092 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -43,8 +43,7 @@ public class TestExprSemantic {
   public void testLogicalExpr() throws Exception {
     Values v = testExpr(
         Lists.newArrayList("ID > 0 OR ID < 1", "ID > 0 AND ID < 1",
-                           "NOT (ID > 0 AND ID < 1)"),
-        "WHERE ID > 0 AND ID < 2");
+                           "NOT (ID > 0 AND ID < 1)"));
     assertEquals(new Values(true, false, true), v);
   }
 
@@ -58,12 +57,42 @@ public class TestExprSemantic {
     assertEquals(new Values(true, false, false, true, false, true, false), v);
   }
 
-  private Values testExpr(List<String> exprs, String additionalCaluse)
-      throws Exception {
-    String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO";
-    if (additionalCaluse != null) {
-      sql += " " + additionalCaluse;
-    }
+  @Test
+  public void testNotWithNull() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "NOT TRUE", "NOT FALSE", "NOT UNKNOWN"
+        ));
+    assertEquals(new Values(false, true, null), v);
+  }
+
+  @Test
+  public void testAndWithNull() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "UNKNOWN AND TRUE", "UNKNOWN AND FALSE", "UNKNOWN AND UNKNOWN",
+            "TRUE AND TRUE", "TRUE AND FALSE", "TRUE AND UNKNOWN",
+            "FALSE AND TRUE", "FALSE AND FALSE", "FALSE AND UNKNOWN"
+        ));
+    assertEquals(new Values(null, false, null, true, false, null, false,
+                            false, false), v);
+  }
+
+  @Test
+  public void testOrWithNull() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "UNKNOWN OR TRUE", "UNKNOWN OR FALSE", "UNKNOWN OR UNKNOWN",
+            "TRUE OR TRUE", "TRUE OR FALSE", "TRUE OR UNKNOWN",
+            "FALSE OR TRUE", "FALSE OR FALSE", "FALSE OR UNKNOWN"
+            ));
+    assertEquals(new Values(true, null, null, true, true, true, true,
+                            false, null), v);
+  }
+
+  private Values testExpr(List<String> exprs) throws Exception {
+    String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
+        " WHERE ID > 0 AND ID < 2";
     TestUtils.CalciteState state = TestUtils.sqlOverDummyTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
     AbstractValuesProcessor proc = compiler.compile(state.tree);
@@ -75,8 +104,4 @@ public class TestExprSemantic {
     return values.get(0);
   }
 
-  private Values testExpr(List<String> exprs) throws Exception {
-    return testExpr(exprs, null);
-  }
-
 }