You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lz...@apache.org on 2017/07/19 13:40:51 UTC

[1/2] beam git commit: [BEAM-2613] add integration test for comparison operators

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL bff21499c -> 97a156c30


[BEAM-2613] add integration test for comparison operators


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5445668
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5445668
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5445668

Branch: refs/heads/DSL_SQL
Commit: c5445668dd20fdef50554a0e75945d292dce36c7
Parents: bff2149
Author: James Xu <xu...@gmail.com>
Authored: Thu Jul 13 18:24:55 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Wed Jul 19 21:38:42 2017 +0800

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |  26 +-
 .../operator/BeamSqlCompareExpression.java      |  93 ------
 .../operator/BeamSqlEqualExpression.java        |  48 ---
 .../operator/BeamSqlIsNotNullExpression.java    |  51 ---
 .../operator/BeamSqlIsNullExpression.java       |  51 ---
 .../BeamSqlLargerThanEqualExpression.java       |  48 ---
 .../operator/BeamSqlLargerThanExpression.java   |  48 ---
 .../BeamSqlLessThanEqualExpression.java         |  48 ---
 .../operator/BeamSqlLessThanExpression.java     |  48 ---
 .../operator/BeamSqlNotEqualExpression.java     |  48 ---
 .../comparison/BeamSqlCompareExpression.java    |  96 ++++++
 .../comparison/BeamSqlEqualsExpression.java     |  49 +++
 .../BeamSqlGreaterThanExpression.java           |  49 +++
 .../BeamSqlGreaterThanOrEqualsExpression.java   |  49 +++
 .../comparison/BeamSqlIsNotNullExpression.java  |  53 +++
 .../comparison/BeamSqlIsNullExpression.java     |  53 +++
 .../comparison/BeamSqlLessThanExpression.java   |  49 +++
 .../BeamSqlLessThanOrEqualsExpression.java      |  49 +++
 .../comparison/BeamSqlNotEqualsExpression.java  |  49 +++
 .../operator/comparison/package-info.java       |  22 ++
 ...amSqlComparisonOperatorsIntegrationTest.java | 330 +++++++++++++++++++
 .../sql/interpreter/BeamSqlFnExecutorTest.java  |   8 +-
 .../operator/BeamNullExperssionTest.java        |   2 +
 .../operator/BeamSqlCompareExpressionTest.java  |  27 +-
 24 files changed, 884 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 0db33cb..aee0e4a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -23,16 +23,8 @@ import java.util.Calendar;
 import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlReinterpretExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
@@ -44,6 +36,14 @@ import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpr
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
@@ -208,22 +208,22 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
           ret = new BeamSqlNotExpression(subExps);
           break;
         case "=":
-          ret = new BeamSqlEqualExpression(subExps);
+          ret = new BeamSqlEqualsExpression(subExps);
           break;
         case "<>":
-          ret = new BeamSqlNotEqualExpression(subExps);
+          ret = new BeamSqlNotEqualsExpression(subExps);
           break;
         case ">":
-          ret = new BeamSqlLargerThanExpression(subExps);
+          ret = new BeamSqlGreaterThanExpression(subExps);
           break;
         case ">=":
-          ret = new BeamSqlLargerThanEqualExpression(subExps);
+          ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
           break;
         case "<":
           ret = new BeamSqlLessThanExpression(subExps);
           break;
         case "<=":
-          ret = new BeamSqlLessThanEqualExpression(subExps);
+          ret = new BeamSqlLessThanOrEqualsExpression(subExps);
           break;
 
         // arithmetic operators

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
deleted file mode 100644
index 5076ccc..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@link BeamSqlCompareExpression} is used for compare operations.
- *
- * <p>See {@link BeamSqlEqualExpression}, {@link BeamSqlLessThanExpression},
- * {@link BeamSqlLessThanEqualExpression}, {@link BeamSqlLargerThanExpression},
- * {@link BeamSqlLargerThanEqualExpression} and {@link BeamSqlNotEqualExpression} for more details.
- *
- */
-public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
-
-  private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
-    this(operands, SqlTypeName.BOOLEAN);
-  }
-
-  /**
-   * Compare operation must have 2 operands.
-   */
-  @Override
-  public boolean accept() {
-    return operands.size() == 2;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
-    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
-    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
-    switch (operands.get(0).outputType) {
-    case BIGINT:
-    case DECIMAL:
-    case DOUBLE:
-    case FLOAT:
-    case INTEGER:
-    case SMALLINT:
-    case TINYINT:
-      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
-          compare((Number) leftValue, (Number) rightValue));
-    case BOOLEAN:
-      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
-          compare((Boolean) leftValue, (Boolean) rightValue));
-    case VARCHAR:
-      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
-          compare((CharSequence) leftValue, (CharSequence) rightValue));
-    default:
-      throw new UnsupportedOperationException(toString());
-    }
-  }
-
-  /**
-   * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
-   */
-  public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
-
-  /**
-   * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
-   */
-  public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
-
-  /**
-   * Compare between Number values, including {@link SqlTypeName#BIGINT},
-   * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
-   * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
-   */
-  public abstract Boolean compare(Number leftValue, Number rightValue);
-
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
deleted file mode 100644
index 4bc487b..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-
-/**
- * {@code BeamSqlExpression} for {@code =} operation.
- */
-public class BeamSqlEqualExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlEqualExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    return !(leftValue ^ rightValue);
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() == (rightValue).floatValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
deleted file mode 100644
index 23d9c83..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for 'IS NOT NULL' operation.
- */
-public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
-
-  private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  public BeamSqlIsNotNullExpression(BeamSqlExpression operand){
-    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
-  }
-
-  /**
-   * only one operand is required.
-   */
-  @Override
-  public boolean accept() {
-    return operands.size() == 1;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
-    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
-    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
deleted file mode 100644
index 4d3fd45..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for 'IS NULL' operation.
- */
-public class BeamSqlIsNullExpression extends BeamSqlExpression {
-
-  private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  public BeamSqlIsNullExpression(BeamSqlExpression operand){
-    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
-  }
-
-  /**
-   * only one operand is required.
-   */
-  @Override
-  public boolean accept() {
-    return operands.size() == 1;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
-    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
-    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
deleted file mode 100644
index 76ca71d..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-
-/**
- * {@code BeamSqlExpression} for {@code >=} operation.
- */
-public class BeamSqlLargerThanEqualExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlLargerThanEqualExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    throw new IllegalArgumentException(">= is not supported for Boolean.");
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() >= (rightValue).floatValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
deleted file mode 100644
index c7ce836..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-
-/**
- * {@code BeamSqlExpression} for {@code >} operation.
- */
-public class BeamSqlLargerThanExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlLargerThanExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    throw new IllegalArgumentException("> is not supported for Boolean.");
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() > (rightValue).floatValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
deleted file mode 100644
index 1791b79..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-
-/**
- * {@code BeamSqlExpression} for {@code <=} operation.
- */
-public class BeamSqlLessThanEqualExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlLessThanEqualExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    throw new IllegalArgumentException("<= is not supported for Boolean.");
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() <= (rightValue).floatValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
deleted file mode 100644
index 7382d92..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-
-/**
- * {@code BeamSqlExpression} for {@code <} operation.
- */
-public class BeamSqlLessThanExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    throw new IllegalArgumentException("< is not supported for Boolean.");
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() < (rightValue).floatValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
deleted file mode 100644
index 2b093bf..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-
-/**
- * {@code BeamSqlExpression} for {@code <>} operation.
- */
-public class BeamSqlNotEqualExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlNotEqualExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    return leftValue ^ rightValue;
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() != (rightValue).floatValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
new file mode 100644
index 0000000..80f0853
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlCompareExpression} is used for compare operations.
+ *
+ * <p>See {@link BeamSqlEqualsExpression}, {@link BeamSqlLessThanExpression},
+ * {@link BeamSqlLessThanOrEqualsExpression}, {@link BeamSqlGreaterThanExpression},
+ * {@link BeamSqlGreaterThanOrEqualsExpression} and {@link BeamSqlNotEqualsExpression}
+ * for more details.
+ *
+ */
+public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
+
+  private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * Compare operation must have 2 operands.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 2;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
+    switch (operands.get(0).getOutputType()) {
+    case BIGINT:
+    case DECIMAL:
+    case DOUBLE:
+    case FLOAT:
+    case INTEGER:
+    case SMALLINT:
+    case TINYINT:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Number) leftValue, (Number) rightValue));
+    case BOOLEAN:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Boolean) leftValue, (Boolean) rightValue));
+    case VARCHAR:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((CharSequence) leftValue, (CharSequence) rightValue));
+    default:
+      throw new UnsupportedOperationException(toString());
+    }
+  }
+
+  /**
+   * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
+   */
+  public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
+
+  /**
+   * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
+   */
+  public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
+
+  /**
+   * Compare between Number values, including {@link SqlTypeName#BIGINT},
+   * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
+   * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
+   */
+  public abstract Boolean compare(Number leftValue, Number rightValue);
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
new file mode 100644
index 0000000..40b015e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code =} operation.
+ */
+public class BeamSqlEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return !(leftValue ^ rightValue);
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() == (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
new file mode 100644
index 0000000..8bfa511
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >} operation.
+ */
+public class BeamSqlGreaterThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlGreaterThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("> is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() > (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
new file mode 100644
index 0000000..54faa35
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >=} operation.
+ */
+public class BeamSqlGreaterThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlGreaterThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException(">= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() >= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
new file mode 100644
index 0000000..6d93c5d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NOT NULL' operation.
+ */
+public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNotNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
new file mode 100644
index 0000000..4450f3a
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NULL' operation.
+ */
+public class BeamSqlIsNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
new file mode 100644
index 0000000..7ae6dad
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <} operation.
+ */
+public class BeamSqlLessThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("< is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() < (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
new file mode 100644
index 0000000..4a2cef2
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <=} operation.
+ */
+public class BeamSqlLessThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("<= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() <= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
new file mode 100644
index 0000000..e02df3d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <>} operation.
+ */
+public class BeamSqlNotEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlNotEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return leftValue ^ rightValue;
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() != (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java
new file mode 100644
index 0000000..eea18ff
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Comparison operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
new file mode 100644
index 0000000..5502ad4
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.integrationtest;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+
+/**
+ * Integration test for comparison operators.
+ */
+public class BeamSqlComparisonOperatorsIntegrationTest
+    extends BeamSqlBuiltinFunctionsIntegrationTestBase {
+
+  @Test
+  public void testEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_1 = c_tinyint_1", true)
+        .addExpr("c_tinyint_1 = c_tinyint_2", false)
+        .addExpr("c_smallint_1 = c_smallint_1", true)
+        .addExpr("c_smallint_1 = c_smallint_2", false)
+        .addExpr("c_integer_1 = c_integer_1", true)
+        .addExpr("c_integer_1 = c_integer_2", false)
+        .addExpr("c_bigint_1 = c_bigint_1", true)
+        .addExpr("c_bigint_1 = c_bigint_2", false)
+        .addExpr("c_float_1 = c_float_1", true)
+        .addExpr("c_float_1 = c_float_2", false)
+        .addExpr("c_double_1 = c_double_1", true)
+        .addExpr("c_double_1 = c_double_2", false)
+        .addExpr("c_decimal_1 = c_decimal_1", true)
+        .addExpr("c_decimal_1 = c_decimal_2", false)
+        .addExpr("c_varchar_1 = c_varchar_1", true)
+        .addExpr("c_varchar_1 = c_varchar_2", false)
+        .addExpr("c_boolean_true = c_boolean_true", true)
+        .addExpr("c_boolean_true = c_boolean_false", false)
+
+        ;
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testNotEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_1 <> c_tinyint_1", false)
+        .addExpr("c_tinyint_1 <> c_tinyint_2", true)
+        .addExpr("c_smallint_1 <> c_smallint_1", false)
+        .addExpr("c_smallint_1 <> c_smallint_2", true)
+        .addExpr("c_integer_1 <> c_integer_1", false)
+        .addExpr("c_integer_1 <> c_integer_2", true)
+        .addExpr("c_bigint_1 <> c_bigint_1", false)
+        .addExpr("c_bigint_1 <> c_bigint_2", true)
+        .addExpr("c_float_1 <> c_float_1", false)
+        .addExpr("c_float_1 <> c_float_2", true)
+        .addExpr("c_double_1 <> c_double_1", false)
+        .addExpr("c_double_1 <> c_double_2", true)
+        .addExpr("c_decimal_1 <> c_decimal_1", false)
+        .addExpr("c_decimal_1 <> c_decimal_2", true)
+        .addExpr("c_varchar_1 <> c_varchar_1", false)
+        .addExpr("c_varchar_1 <> c_varchar_2", true)
+        .addExpr("c_boolean_true <> c_boolean_true", false)
+        .addExpr("c_boolean_true <> c_boolean_false", true)
+        ;
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testGreaterThan() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 > c_tinyint_1", true)
+        .addExpr("c_tinyint_1 > c_tinyint_1", false)
+        .addExpr("c_tinyint_1 > c_tinyint_2", false)
+
+        .addExpr("c_smallint_2 > c_smallint_1", true)
+        .addExpr("c_smallint_1 > c_smallint_1", false)
+        .addExpr("c_smallint_1 > c_smallint_2", false)
+
+        .addExpr("c_integer_2 > c_integer_1", true)
+        .addExpr("c_integer_1 > c_integer_1", false)
+        .addExpr("c_integer_1 > c_integer_2", false)
+
+        .addExpr("c_bigint_2 > c_bigint_1", true)
+        .addExpr("c_bigint_1 > c_bigint_1", false)
+        .addExpr("c_bigint_1 > c_bigint_2", false)
+
+        .addExpr("c_float_2 > c_float_1", true)
+        .addExpr("c_float_1 > c_float_1", false)
+        .addExpr("c_float_1 > c_float_2", false)
+
+        .addExpr("c_double_2 > c_double_1", true)
+        .addExpr("c_double_1 > c_double_1", false)
+        .addExpr("c_double_1 > c_double_2", false)
+
+        .addExpr("c_decimal_2 > c_decimal_1", true)
+        .addExpr("c_decimal_1 > c_decimal_1", false)
+        .addExpr("c_decimal_1 > c_decimal_2", false)
+
+        .addExpr("c_varchar_2 > c_varchar_1", true)
+        .addExpr("c_varchar_1 > c_varchar_1", false)
+        .addExpr("c_varchar_1 > c_varchar_2", false)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testGreaterThanException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false > c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testGreaterThanOrEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 >= c_tinyint_1", true)
+        .addExpr("c_tinyint_1 >= c_tinyint_1", true)
+        .addExpr("c_tinyint_1 >= c_tinyint_2", false)
+
+        .addExpr("c_smallint_2 >= c_smallint_1", true)
+        .addExpr("c_smallint_1 >= c_smallint_1", true)
+        .addExpr("c_smallint_1 >= c_smallint_2", false)
+
+        .addExpr("c_integer_2 >= c_integer_1", true)
+        .addExpr("c_integer_1 >= c_integer_1", true)
+        .addExpr("c_integer_1 >= c_integer_2", false)
+
+        .addExpr("c_bigint_2 >= c_bigint_1", true)
+        .addExpr("c_bigint_1 >= c_bigint_1", true)
+        .addExpr("c_bigint_1 >= c_bigint_2", false)
+
+        .addExpr("c_float_2 >= c_float_1", true)
+        .addExpr("c_float_1 >= c_float_1", true)
+        .addExpr("c_float_1 >= c_float_2", false)
+
+        .addExpr("c_double_2 >= c_double_1", true)
+        .addExpr("c_double_1 >= c_double_1", true)
+        .addExpr("c_double_1 >= c_double_2", false)
+
+        .addExpr("c_decimal_2 >= c_decimal_1", true)
+        .addExpr("c_decimal_1 >= c_decimal_1", true)
+        .addExpr("c_decimal_1 >= c_decimal_2", false)
+
+        .addExpr("c_varchar_2 >= c_varchar_1", true)
+        .addExpr("c_varchar_1 >= c_varchar_1", true)
+        .addExpr("c_varchar_1 >= c_varchar_2", false)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testGreaterThanOrEqualsException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false >= c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLessThan() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 < c_tinyint_1", false)
+        .addExpr("c_tinyint_1 < c_tinyint_1", false)
+        .addExpr("c_tinyint_1 < c_tinyint_2", true)
+
+        .addExpr("c_smallint_2 < c_smallint_1", false)
+        .addExpr("c_smallint_1 < c_smallint_1", false)
+        .addExpr("c_smallint_1 < c_smallint_2", true)
+
+        .addExpr("c_integer_2 < c_integer_1", false)
+        .addExpr("c_integer_1 < c_integer_1", false)
+        .addExpr("c_integer_1 < c_integer_2", true)
+
+        .addExpr("c_bigint_2 < c_bigint_1", false)
+        .addExpr("c_bigint_1 < c_bigint_1", false)
+        .addExpr("c_bigint_1 < c_bigint_2", true)
+
+        .addExpr("c_float_2 < c_float_1", false)
+        .addExpr("c_float_1 < c_float_1", false)
+        .addExpr("c_float_1 < c_float_2", true)
+
+        .addExpr("c_double_2 < c_double_1", false)
+        .addExpr("c_double_1 < c_double_1", false)
+        .addExpr("c_double_1 < c_double_2", true)
+
+        .addExpr("c_decimal_2 < c_decimal_1", false)
+        .addExpr("c_decimal_1 < c_decimal_1", false)
+        .addExpr("c_decimal_1 < c_decimal_2", true)
+
+        .addExpr("c_varchar_2 < c_varchar_1", false)
+        .addExpr("c_varchar_1 < c_varchar_1", false)
+        .addExpr("c_varchar_1 < c_varchar_2", true)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testLessThanException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false < c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testLessThanOrEquals() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_tinyint_2 <= c_tinyint_1", false)
+        .addExpr("c_tinyint_1 <= c_tinyint_1", true)
+        .addExpr("c_tinyint_1 <= c_tinyint_2", true)
+
+        .addExpr("c_smallint_2 <= c_smallint_1", false)
+        .addExpr("c_smallint_1 <= c_smallint_1", true)
+        .addExpr("c_smallint_1 <= c_smallint_2", true)
+
+        .addExpr("c_integer_2 <= c_integer_1", false)
+        .addExpr("c_integer_1 <= c_integer_1", true)
+        .addExpr("c_integer_1 <= c_integer_2", true)
+
+        .addExpr("c_bigint_2 <= c_bigint_1", false)
+        .addExpr("c_bigint_1 <= c_bigint_1", true)
+        .addExpr("c_bigint_1 <= c_bigint_2", true)
+
+        .addExpr("c_float_2 <= c_float_1", false)
+        .addExpr("c_float_1 <= c_float_1", true)
+        .addExpr("c_float_1 <= c_float_2", true)
+
+        .addExpr("c_double_2 <= c_double_1", false)
+        .addExpr("c_double_1 <= c_double_1", true)
+        .addExpr("c_double_1 <= c_double_2", true)
+
+        .addExpr("c_decimal_2 <= c_decimal_1", false)
+        .addExpr("c_decimal_1 <= c_decimal_1", true)
+        .addExpr("c_decimal_1 <= c_decimal_2", true)
+
+        .addExpr("c_varchar_2 <= c_varchar_1", false)
+        .addExpr("c_varchar_1 <= c_varchar_1", true)
+        .addExpr("c_varchar_1 <= c_varchar_2", true)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testLessThanOrEqualsException() {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("c_boolean_false <= c_boolean_true", false);
+    checker.buildRunAndCheck();
+  }
+
+  @Test
+  public void testIsNullAndIsNotNull() throws Exception {
+    ExpressionChecker checker = new ExpressionChecker()
+        .addExpr("1 IS NOT NULL", true)
+        .addExpr("NULL IS NOT NULL", false)
+
+        .addExpr("1 IS NULL", false)
+        .addExpr("NULL IS NULL", true)
+        ;
+
+    checker.buildRunAndCheck();
+  }
+
+  @Override protected PCollection<BeamSqlRow> getTestPCollection() {
+    BeamSqlRowType type = BeamSqlRowType.create(
+        Arrays.asList(
+            "c_tinyint_0", "c_tinyint_1", "c_tinyint_2",
+            "c_smallint_0", "c_smallint_1", "c_smallint_2",
+            "c_integer_0", "c_integer_1", "c_integer_2",
+            "c_bigint_0", "c_bigint_1", "c_bigint_2",
+            "c_float_0", "c_float_1", "c_float_2",
+            "c_double_0", "c_double_1", "c_double_2",
+            "c_decimal_0", "c_decimal_1", "c_decimal_2",
+            "c_varchar_0", "c_varchar_1", "c_varchar_2",
+            "c_boolean_false", "c_boolean_true"
+            ),
+        Arrays.asList(
+            Types.TINYINT, Types.TINYINT, Types.TINYINT,
+            Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
+            Types.INTEGER, Types.INTEGER, Types.INTEGER,
+            Types.BIGINT, Types.BIGINT, Types.BIGINT,
+            Types.FLOAT, Types.FLOAT, Types.FLOAT,
+            Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+            Types.DECIMAL, Types.DECIMAL, Types.DECIMAL,
+            Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
+            Types.BOOLEAN, Types.BOOLEAN
+        )
+    );
+    try {
+      return MockedBoundedTable
+          .of(type)
+          .addRows(
+              (byte) 0, (byte) 1, (byte) 2,
+              (short) 0, (short) 1, (short) 2,
+              0, 1, 2,
+              0L, 1L, 2L,
+              0.0f, 1.0f, 2.0f,
+              0.0, 1.0, 2.0,
+              BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE),
+              "a", "b", "c",
+              false, true
+          )
+          .buildIOReader(pipeline)
+          .setCoder(new BeamSqlRowCoder(type));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
index e9bdf07..15d5a52 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
@@ -25,16 +25,16 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.TimeZone;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
@@ -99,8 +99,8 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
     BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
     BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
 
-    assertTrue(l1Left instanceof BeamSqlLessThanEqualExpression);
-    assertTrue(l1Right instanceof BeamSqlEqualExpression);
+    assertTrue(l1Left instanceof BeamSqlLessThanOrEqualsExpression);
+    assertTrue(l1Right instanceof BeamSqlEqualsExpression);
 
     Assert.assertEquals(2, l1Left.getOperands().size());
     BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0);

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
index b0cc84d..8ff105e 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/c5445668/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
index c76fa1c..50f1b78 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
@@ -19,6 +19,13 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.Arrays;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlCompareExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -30,12 +37,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testEqual() {
-    BeamSqlEqualExpression exp1 = new BeamSqlEqualExpression(
+    BeamSqlEqualsExpression exp1 = new BeamSqlEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 100L)));
     Assert.assertEquals(false, exp1.evaluate(record).getValue());
 
-    BeamSqlEqualExpression exp2 = new BeamSqlEqualExpression(
+    BeamSqlEqualsExpression exp2 = new BeamSqlEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
     Assert.assertEquals(true, exp2.evaluate(record).getValue());
@@ -43,12 +50,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testLargerThan(){
-    BeamSqlLargerThanExpression exp1 = new BeamSqlLargerThanExpression(
+    BeamSqlGreaterThanExpression exp1 = new BeamSqlGreaterThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
     Assert.assertEquals(false, exp1.evaluate(record).getValue());
 
-    BeamSqlLargerThanExpression exp2 = new BeamSqlLargerThanExpression(
+    BeamSqlGreaterThanExpression exp2 = new BeamSqlGreaterThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234566L)));
     Assert.assertEquals(true, exp2.evaluate(record).getValue());
@@ -56,12 +63,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testLargerThanEqual(){
-    BeamSqlLargerThanEqualExpression exp1 = new BeamSqlLargerThanEqualExpression(
+    BeamSqlGreaterThanOrEqualsExpression exp1 = new BeamSqlGreaterThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
     Assert.assertEquals(true, exp1.evaluate(record).getValue());
 
-    BeamSqlLargerThanEqualExpression exp2 = new BeamSqlLargerThanEqualExpression(
+    BeamSqlGreaterThanOrEqualsExpression exp2 = new BeamSqlGreaterThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234568L)));
     Assert.assertEquals(false, exp2.evaluate(record).getValue());
@@ -82,12 +89,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testLessThanEqual(){
-    BeamSqlLessThanEqualExpression exp1 = new BeamSqlLessThanEqualExpression(
+    BeamSqlLessThanOrEqualsExpression exp1 = new BeamSqlLessThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
             BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.9)));
     Assert.assertEquals(true, exp1.evaluate(record).getValue());
 
-    BeamSqlLessThanEqualExpression exp2 = new BeamSqlLessThanEqualExpression(
+    BeamSqlLessThanOrEqualsExpression exp2 = new BeamSqlLessThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
             BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.0)));
     Assert.assertEquals(false, exp2.evaluate(record).getValue());
@@ -95,12 +102,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testNotEqual(){
-    BeamSqlNotEqualExpression exp1 = new BeamSqlNotEqualExpression(
+    BeamSqlNotEqualsExpression exp1 = new BeamSqlNotEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
     Assert.assertEquals(false, exp1.evaluate(record).getValue());
 
-    BeamSqlNotEqualExpression exp2 = new BeamSqlNotEqualExpression(
+    BeamSqlNotEqualsExpression exp2 = new BeamSqlNotEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 0L)));
     Assert.assertEquals(true, exp2.evaluate(record).getValue());


[2/2] beam git commit: This closes #3583

Posted by lz...@apache.org.
This closes #3583


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97a156c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97a156c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97a156c3

Branch: refs/heads/DSL_SQL
Commit: 97a156c30fbe4b0f1c12b1b3946e8503fccca21a
Parents: bff2149 c544566
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jul 19 21:39:16 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Wed Jul 19 21:39:16 2017 +0800

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |  26 +-
 .../operator/BeamSqlCompareExpression.java      |  93 ------
 .../operator/BeamSqlEqualExpression.java        |  48 ---
 .../operator/BeamSqlIsNotNullExpression.java    |  51 ---
 .../operator/BeamSqlIsNullExpression.java       |  51 ---
 .../BeamSqlLargerThanEqualExpression.java       |  48 ---
 .../operator/BeamSqlLargerThanExpression.java   |  48 ---
 .../BeamSqlLessThanEqualExpression.java         |  48 ---
 .../operator/BeamSqlLessThanExpression.java     |  48 ---
 .../operator/BeamSqlNotEqualExpression.java     |  48 ---
 .../comparison/BeamSqlCompareExpression.java    |  96 ++++++
 .../comparison/BeamSqlEqualsExpression.java     |  49 +++
 .../BeamSqlGreaterThanExpression.java           |  49 +++
 .../BeamSqlGreaterThanOrEqualsExpression.java   |  49 +++
 .../comparison/BeamSqlIsNotNullExpression.java  |  53 +++
 .../comparison/BeamSqlIsNullExpression.java     |  53 +++
 .../comparison/BeamSqlLessThanExpression.java   |  49 +++
 .../BeamSqlLessThanOrEqualsExpression.java      |  49 +++
 .../comparison/BeamSqlNotEqualsExpression.java  |  49 +++
 .../operator/comparison/package-info.java       |  22 ++
 ...amSqlComparisonOperatorsIntegrationTest.java | 330 +++++++++++++++++++
 .../sql/interpreter/BeamSqlFnExecutorTest.java  |   8 +-
 .../operator/BeamNullExperssionTest.java        |   2 +
 .../operator/BeamSqlCompareExpressionTest.java  |  27 +-
 24 files changed, 884 insertions(+), 510 deletions(-)
----------------------------------------------------------------------