You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/04 07:13:10 UTC

[2/3] beam git commit: redesign BeamSqlExpression to execute Calcite SQL expression.

redesign BeamSqlExpression to execute Calcite SQL expression.

Changes:
1. revert BEAM dependency to 0.6.0 to avoid impact of changes in master branch;
2. updates as discussion during review;

refine BeamSqlRowCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/464cc275
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/464cc275
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/464cc275

Branch: refs/heads/DSL_SQL
Commit: 464cc275952305bf51ecfdf056e784441c9c2272
Parents: aa07a1d
Author: mingmxu <mi...@ebay.com>
Authored: Sat Apr 22 22:20:25 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Thu May 4 00:12:11 2017 -0700

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |  47 ++++++-
 .../beam/dsls/sql/example/BeamSqlExample.java   |   5 -
 .../exception/BeamInvalidOperatorException.java |  34 +++++
 .../exception/BeamSqlUnsupportedException.java  |  34 +++++
 .../sql/exception/InvalidFieldException.java    |  34 +++++
 .../beam/dsls/sql/exception/package-info.java   |  23 +++
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 140 +++++++++++++++++++
 .../sql/interpreter/BeamSQLSpELExecutor.java    | 127 -----------------
 .../dsls/sql/interpreter/CalciteToSpEL.java     |  81 -----------
 .../operator/BeamSqlAndExpression.java          |  60 ++++++++
 .../operator/BeamSqlCompareExpression.java      |  94 +++++++++++++
 .../operator/BeamSqlEqualExpression.java        |  48 +++++++
 .../interpreter/operator/BeamSqlExpression.java |  62 ++++++++
 .../operator/BeamSqlInputRefExpression.java     |  46 ++++++
 .../operator/BeamSqlIsNotNullExpression.java    |  51 +++++++
 .../operator/BeamSqlIsNullExpression.java       |  51 +++++++
 .../BeamSqlLargerThanEqualExpression.java       |  49 +++++++
 .../operator/BeamSqlLargerThanExpression.java   |  49 +++++++
 .../BeamSqlLessThanEqualExpression.java         |  49 +++++++
 .../operator/BeamSqlLessThanExpression.java     |  49 +++++++
 .../operator/BeamSqlNotEqualExpression.java     |  48 +++++++
 .../operator/BeamSqlOrExpression.java           |  60 ++++++++
 .../interpreter/operator/BeamSqlPrimitive.java  | 102 ++++++++++++++
 .../sql/interpreter/operator/package-info.java  |  22 +++
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |   8 +-
 .../beam/dsls/sql/planner/BeamSqlRunner.java    |  15 +-
 .../planner/BeamSqlUnsupportedException.java    |  38 -----
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |   4 +-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |   4 +-
 .../beam/dsls/sql/schema/BaseBeamTable.java     |   5 -
 .../beam/dsls/sql/schema/BeamSQLRecordType.java |   4 -
 .../dsls/sql/schema/BeamSQLRecordTypeCoder.java |  15 +-
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java |  76 ++++++----
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  48 ++++---
 .../dsls/sql/schema/InvalidFieldException.java  |  34 -----
 .../sql/schema/kafka/BeamKafkaCSVTable.java     |   5 -
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   |   5 -
 .../dsls/sql/transform/BeamSQLFilterFn.java     |   4 -
 .../sql/transform/BeamSQLOutputToConsoleFn.java |   4 -
 .../dsls/sql/transform/BeamSQLProjectFn.java    |   5 -
 .../sql/interpreter/BeamSQLFnExecutorTest.java  | 101 +++++++++++++
 .../interpreter/BeamSQLFnExecutorTestBase.java  |  91 ++++++++++++
 .../operator/BeamNullExperssionTest.java        |  53 +++++++
 .../operator/BeamSqlAndOrExpressionTest.java    |  59 ++++++++
 .../operator/BeamSqlCompareExpressionTest.java  | 108 ++++++++++++++
 .../operator/BeamSqlInputRefExpressionTest.java |  58 ++++++++
 .../operator/BeamSqlPrimitiveTest.java          |  60 ++++++++
 .../beam/dsls/sql/planner/BasePlanner.java      |  28 +++-
 .../sql/planner/BeamPlannerExplainTest.java     |   5 +-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |   7 +-
 .../dsls/sql/planner/MockedBeamSQLTable.java    |  14 +-
 51 files changed, 1803 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index 21c8def..e2f09be 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -116,7 +116,42 @@
       </plugin>
     </plugins>
   </build>
-  
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-core</artifactId>
+        <version>0.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-direct-java</artifactId>
+        <version>0.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-kafka</artifactId>
+        <version>0.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-core-java</artifactId>
+        <version>0.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-common-runner-api</artifactId>
+        <version>0.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-core-construction-java</artifactId>
+        <version>0.6.0</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>junit</groupId>
@@ -130,6 +165,12 @@
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-lite</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -142,10 +183,6 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring-expression</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index d32bc59..303835f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -66,11 +66,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
  */
 public class BeamSqlExample implements Serializable {
 
-  /**
-   *
-   */
-  private static final long serialVersionUID = 3673487843555563904L;
-
   public static void main(String[] args) throws Exception {
     BeamSqlRunner runner = new BeamSqlRunner();
     runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders"));

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java
new file mode 100644
index 0000000..281ef89
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.exception;
+
+/**
+ * operation is not supported.
+ *
+ */
+public class BeamInvalidOperatorException extends RuntimeException {
+
+  public BeamInvalidOperatorException(String string) {
+    super(string);
+  }
+
+  public BeamInvalidOperatorException() {
+    super();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java
new file mode 100644
index 0000000..02e843b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.exception;
+
+/**
+ * Generic exception for un-supported features/functions in BeamSQL.
+ *
+ */
+public class BeamSqlUnsupportedException extends RuntimeException {
+
+  public BeamSqlUnsupportedException(String string) {
+    super(string);
+  }
+
+  public BeamSqlUnsupportedException() {
+    super();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java
new file mode 100644
index 0000000..82ebabe
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.exception;
+
+/**
+ * Exception when the field value and field type is not compatible.
+ *
+ */
+public class InvalidFieldException extends RuntimeException {
+
+  public InvalidFieldException() {
+    super();
+  }
+
+  public InvalidFieldException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java
new file mode 100644
index 0000000..619100c
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Exceptions in BeamSQL.
+ *
+ */
+package org.apache.beam.dsls.sql.exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
new file mode 100644
index 0000000..32e2ffc
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSQLFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSQLRow}.
+ *
+ */
+public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
+  protected List<BeamSqlExpression> exps;
+
+  public BeamSQLFnExecutor(BeamRelNode relNode) {
+    this.exps = new ArrayList<>();
+    if (relNode instanceof BeamFilterRel) {
+      BeamFilterRel filterNode = (BeamFilterRel) relNode;
+      RexNode condition = filterNode.getCondition();
+      exps.add(buildExpression(condition));
+    } else if (relNode instanceof BeamProjectRel) {
+      BeamProjectRel projectNode = (BeamProjectRel) relNode;
+      List<RexNode> projects = projectNode.getProjects();
+      for (RexNode rexNode : projects) {
+        exps.add(buildExpression(rexNode));
+      }
+    } else {
+      throw new BeamSqlUnsupportedException(
+          String.format("%s is not supported yet", relNode.getClass().toString()));
+    }
+  }
+
+  /**
+   * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+   * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+   */
+  private BeamSqlExpression buildExpression(RexNode rexNode) {
+    if (rexNode instanceof RexLiteral) {
+      RexLiteral node = (RexLiteral) rexNode;
+      return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
+    } else if (rexNode instanceof RexInputRef) {
+      RexInputRef node = (RexInputRef) rexNode;
+      return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+    } else if (rexNode instanceof RexCall) {
+      RexCall node = (RexCall) rexNode;
+      String opName = node.op.getName();
+      List<BeamSqlExpression> subExps = new ArrayList<>();
+      for (RexNode subNode : node.operands) {
+        subExps.add(buildExpression(subNode));
+      }
+      switch (opName) {
+        case "AND":
+        return new BeamSqlAndExpression(subExps);
+        case "OR":
+          return new BeamSqlOrExpression(subExps);
+
+        case "=":
+          return new BeamSqlEqualExpression(subExps);
+        case "<>=":
+          return new BeamSqlNotEqualExpression(subExps);
+        case ">":
+          return new BeamSqlLargerThanExpression(subExps);
+        case ">=":
+          return new BeamSqlLargerThanEqualExpression(subExps);
+        case "<":
+          return new BeamSqlLessThanExpression(subExps);
+        case "<=":
+          return new BeamSqlLessThanEqualExpression(subExps);
+
+        case "IS NULL":
+          return new BeamSqlIsNullExpression(subExps.get(0));
+        case "IS NOT NULL":
+          return new BeamSqlIsNotNullExpression(subExps.get(0));
+      default:
+        throw new BeamSqlUnsupportedException();
+      }
+    } else {
+      throw new BeamSqlUnsupportedException(
+          String.format("%s is not supported yet", rexNode.getClass().toString()));
+    }
+  }
+
+  @Override
+  public void prepare() {
+  }
+
+  @Override
+  public List<Object> execute(BeamSQLRow inputRecord) {
+    List<Object> results = new ArrayList<>();
+    for (BeamSqlExpression exp : exps) {
+      results.add(exp.evaluate(inputRecord).getValue());
+    }
+    return results;
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
deleted file mode 100644
index 9c9c37f..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.springframework.expression.Expression;
-import org.springframework.expression.ExpressionParser;
-import org.springframework.expression.spel.SpelParserConfiguration;
-import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.expression.spel.support.StandardEvaluationContext;
-
-/**
- * {@code BeamSQLSpELExecutor} is one implementation, to convert Calcite SQL
- * relational expression to SpEL expression.
- *
- */
-public class BeamSQLSpELExecutor implements BeamSQLExpressionExecutor {
-  /**
-   *
-   */
-  private static final long serialVersionUID = 6777232573390074408L;
-
-  private List<String> spelString;
-  private List<Expression> spelExpressions;
-
-  public BeamSQLSpELExecutor(BeamRelNode relNode) {
-    this.spelString = new ArrayList<>();
-    if (relNode instanceof BeamFilterRel) {
-      String filterSpEL = CalciteToSpEL
-          .rexcall2SpEL((RexCall) ((BeamFilterRel) relNode).getCondition());
-      spelString.add(filterSpEL);
-    } else if (relNode instanceof BeamProjectRel) {
-      spelString.addAll(createProjectExps((BeamProjectRel) relNode));
-      // List<ProjectRule> projectRules =
-      // for (int idx = 0; idx < projectRules.size(); ++idx) {
-      // spelString.add(projectRules.get(idx).getProjectExp());
-      // }
-    } else {
-      throw new BeamSqlUnsupportedException(
-          String.format("%s is not supported yet", relNode.getClass().toString()));
-    }
-  }
-
-  @Override
-  public void prepare() {
-    this.spelExpressions = new ArrayList<>();
-
-    SpelParserConfiguration config = new SpelParserConfiguration(true, true);
-    ExpressionParser parser = new SpelExpressionParser(config);
-    for (String el : spelString) {
-      spelExpressions.add(parser.parseExpression(el));
-    }
-  }
-
-  @Override
-  public List<Object> execute(BeamSQLRow inputRecord) {
-    StandardEvaluationContext inContext = new StandardEvaluationContext();
-    inContext.setVariable("in", inputRecord);
-
-    List<Object> results = new ArrayList<>();
-    for (Expression ep : spelExpressions) {
-      results.add(ep.getValue(inContext));
-    }
-    return results;
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  private List<String> createProjectExps(BeamProjectRel projectRel) {
-    List<String> rules = new ArrayList<>();
-
-    List<RexNode> exps = projectRel.getProjects();
-
-    for (int idx = 0; idx < exps.size(); ++idx) {
-      RexNode node = exps.get(idx);
-      if (node == null) {
-        rules.add("null");
-      }
-
-      if (node instanceof RexLiteral) {
-        rules.add(((RexLiteral) node).getValue() + "");
-      } else {
-        if (node instanceof RexInputRef) {
-          rules.add("#in.getFieldValue(" + ((RexInputRef) node).getIndex() + ")");
-        }
-        if (node instanceof RexCall) {
-          rules.add(CalciteToSpEL.rexcall2SpEL((RexCall) node));
-        }
-      }
-    }
-
-    checkArgument(rules.size() == exps.size(), "missing projects rules after conversion.");
-
-    return rules;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java
deleted file mode 100644
index 6cdc31b..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * {@code CalciteToSpEL} is used in {@link BeamSQLSpELExecutor}, to convert a
- * relational expression {@link RexCall} to SpEL expression.
- *
- */
-public class CalciteToSpEL {
-
-  public static String rexcall2SpEL(RexCall cdn) {
-    List<String> parts = new ArrayList<>();
-    for (RexNode subcdn : cdn.operands) {
-      if (subcdn instanceof RexCall) {
-        parts.add(rexcall2SpEL((RexCall) subcdn));
-      } else {
-        parts.add(subcdn instanceof RexInputRef
-            ? "#in.getFieldValue(" + ((RexInputRef) subcdn).getIndex() + ")" : subcdn.toString());
-      }
-    }
-
-    String opName = cdn.op.getName();
-    switch (cdn.op.getClass().getSimpleName()) {
-    case "SqlMonotonicBinaryOperator": // +-*
-    case "SqlBinaryOperator": // > < = >= <= <> OR AND || / .
-      switch (cdn.op.getName().toUpperCase()) {
-      case "AND":
-        return String.format(" ( %s ) ", Joiner.on("&&").join(parts));
-      case "OR":
-        return String.format(" ( %s ) ", Joiner.on("||").join(parts));
-      case "=":
-        return String.format(" ( %s ) ", Joiner.on("==").join(parts));
-      case "<>":
-        return String.format(" ( %s ) ", Joiner.on("!=").join(parts));
-      default:
-        return String.format(" ( %s ) ", Joiner.on(cdn.op.getName().toUpperCase()).join(parts));
-      }
-    case "SqlCaseOperator": // CASE
-      return String.format(" (%s ? %s : %s)", parts.get(0), parts.get(1), parts.get(2));
-    case "SqlCastFunction": // CAST
-      return parts.get(0);
-    case "SqlPostfixOperator":
-      switch (opName.toUpperCase()) {
-      case "IS NULL":
-        return String.format(" null == %s ", parts.get(0));
-      case "IS NOT NULL":
-        return String.format(" null != %s ", parts.get(0));
-      default:
-        throw new BeamSqlUnsupportedException();
-      }
-    default:
-      throw new BeamSqlUnsupportedException();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
new file mode 100644
index 0000000..55473b5
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'AND' operation.
+ */
+public class BeamSqlAndExpression extends BeamSqlExpression {
+
+  private BeamSqlAndExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+  public BeamSqlAndExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  @Override
+  public boolean accept() {
+    for (BeamSqlExpression exp : operands) {
+      // only accept BOOLEAN expression as operand
+      if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+    boolean result = true;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+      result = result && expOut.getValue();
+      if (!result) {
+        break;
+      }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
new file mode 100644
index 0000000..bfb798d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlCompareExpression} is used for compare operations.
+ *
+ * <p>See {@link BeamSqlEqualExpression}, {@link BeamSqlLessThanExpression},
+ * {@link BeamSqlLessThanEqualExpression}, {@link BeamSqlLargerThanExpression},
+ * {@link BeamSqlLargerThanEqualExpression} and {@link BeamSqlNotEqualExpression} for more details.
+ *
+ */
+public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
+
+  private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * Compare operation must have 2 operands.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 2;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+    Object rightValue = operands.get(1).evaluate(inputRecord).getValue();
+    switch (operands.get(0).outputType) {
+    case BIGINT:
+    case DECIMAL:
+    case DOUBLE:
+    case FLOAT:
+    case INTEGER:
+    case SMALLINT:
+    case TINYINT:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Number) leftValue, (Number) rightValue));
+    case BOOLEAN:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Boolean) leftValue, (Boolean) rightValue));
+    case VARCHAR:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((CharSequence) leftValue, (CharSequence) rightValue));
+    default:
+      throw new BeamSqlUnsupportedException(toString());
+    }
+  }
+
+  /**
+   * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
+   */
+  public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
+
+  /**
+   * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
+   */
+  public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
+
+  /**
+   * Compare between Number values, including {@link SqlTypeName#BIGINT},
+   * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
+   * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
+   */
+  public abstract Boolean compare(Number leftValue, Number rightValue);
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
new file mode 100644
index 0000000..4bc487b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+
+/**
+ * {@code BeamSqlExpression} for {@code =} operation.
+ */
+public class BeamSqlEqualExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlEqualExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return !(leftValue ^ rightValue);
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() == (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
new file mode 100644
index 0000000..c44795f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
+ *
+ * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
+ * as its operands, and return a value with type {@link SqlTypeName}.
+ *
+ */
+public abstract class BeamSqlExpression implements Serializable{
+  protected List<BeamSqlExpression> operands;
+  protected SqlTypeName outputType;
+
+  protected BeamSqlExpression(){}
+
+  public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    this.operands = operands;
+    this.outputType = outputType;
+  }
+
+  /**
+   * assertion to make sure the input and output are supported in this expression.
+   */
+  public abstract boolean accept();
+
+  /**
+   * Apply input record {@link BeamSQLRow} to this expression,
+   * the output value is wrapped with {@link BeamSqlPrimitive}.
+   */
+  public abstract BeamSqlPrimitive evaluate(BeamSQLRow inputRecord);
+
+  public List<BeamSqlExpression> getOperands() {
+    return operands;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
new file mode 100644
index 0000000..612108f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * An primitive operation for direct field extraction.
+ */
+public class BeamSqlInputRefExpression extends BeamSqlExpression{
+  private int inputRef;
+
+  public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
+    super(null, sqlTypeName);
+    this.inputRef = inputRef;
+  }
+
+  @Override
+  public boolean accept() {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+    return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef));
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
new file mode 100644
index 0000000..784584e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NOT NULL' operation.
+ */
+public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNotNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
new file mode 100644
index 0000000..b09ddbf
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NULL' operation.
+ */
+public class BeamSqlIsNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+    Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
new file mode 100644
index 0000000..d78c020
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code >=} operation.
+ */
+public class BeamSqlLargerThanEqualExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLargerThanEqualExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new BeamInvalidOperatorException(">= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() >= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
new file mode 100644
index 0000000..0b0d6f1
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code >} operation.
+ */
+public class BeamSqlLargerThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLargerThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new BeamInvalidOperatorException("> is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() > (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
new file mode 100644
index 0000000..b6f7c9a
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code <=} operation.
+ */
+public class BeamSqlLessThanEqualExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanEqualExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new BeamInvalidOperatorException("<= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() <= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
new file mode 100644
index 0000000..216a621
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code <} operation.
+ */
+public class BeamSqlLessThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new BeamInvalidOperatorException("< is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() < (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
new file mode 100644
index 0000000..2b093bf
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+
+/**
+ * {@code BeamSqlExpression} for {@code <>} operation.
+ */
+public class BeamSqlNotEqualExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlNotEqualExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return leftValue ^ rightValue;
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() != (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
new file mode 100644
index 0000000..4d07af8
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'OR' operation.
+ */
+public class BeamSqlOrExpression extends BeamSqlExpression {
+
+  private BeamSqlOrExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+  public BeamSqlOrExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  @Override
+  public boolean accept() {
+    for (BeamSqlExpression exp : operands) {
+      // only accept BOOLEAN expression as operand
+      if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+    boolean result = false;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+        result = result || expOut.getValue();
+        if (result) {
+          break;
+        }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
new file mode 100644
index 0000000..71852ff
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSQLRow)}.
+ *
+ */
+public class BeamSqlPrimitive<T> extends BeamSqlExpression{
+  private SqlTypeName outputType;
+  private T value;
+
+  private BeamSqlPrimitive() {
+  }
+
+  private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  /**
+   * A builder function to create from Type and value directly.
+   */
+  public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
+    BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<T>();
+    exp.outputType = outputType;
+    exp.value = value;
+    if (!exp.accept()) {
+      throw new BeamInvalidOperatorException(
+          String.format("value [%s] doesn't match type [%s].", value, outputType));
+    }
+    return exp;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+
+  public T getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean accept() {
+    if (value == null) {
+      return true;
+    }
+
+    switch (outputType) {
+    case BIGINT:
+      return value instanceof Long;
+    case DECIMAL:
+      return value instanceof BigDecimal;
+    case DOUBLE:
+      return value instanceof Double;
+    case FLOAT:
+      return value instanceof Float;
+    case INTEGER:
+      return value instanceof Integer;
+    case SMALLINT:
+      return value instanceof Short;
+    case TINYINT:
+      return value instanceof Byte;
+    case BOOLEAN:
+      return value instanceof Boolean;
+    case CHAR:
+      return value instanceof Character;
+    case VARCHAR:
+      return value instanceof String;
+    default:
+      throw new BeamSqlUnsupportedException(outputType.name());
+    }
+  }
+
+  @Override
+  public BeamSqlPrimitive<T> evaluate(BeamSQLRow inputRecord) {
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
new file mode 100644
index 0000000..9b0a9a7
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index aac86d6..935dae7 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -122,7 +122,13 @@ public class BeamQueryPlanner {
    */
   public BeamRelNode convertToBeamRel(String sqlStatement)
       throws ValidationException, RelConversionException, SqlParseException {
-    return (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
+    BeamRelNode beamRelNode;
+    try {
+      beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
+    } finally {
+      planner.close();
+    }
+    return beamRelNode;
   }
 
   private RelNode validateAndConvert(SqlNode sqlNode)

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
index e457e80..708c507 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
@@ -36,17 +36,17 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class BeamSqlRunner implements Serializable {
-  /**
-   *
-   */
-  private static final long serialVersionUID = -4708693435115005182L;
-
   private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class);
 
   private SchemaPlus schema = Frameworks.createRootSchema(true);
 
   private BeamQueryPlanner planner = new BeamQueryPlanner(schema);
 
+  public BeamSqlRunner() {
+    //disable assertions in Calcite.
+    ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false);
+  }
+
   /**
    * Add a schema.
    *
@@ -70,7 +70,6 @@ public class BeamSqlRunner implements Serializable {
    */
   public void submitQuery(String sqlString) throws Exception {
     planner.submitToRun(sqlString);
-    planner.planner.close();
   }
 
   /**
@@ -78,12 +77,10 @@ public class BeamSqlRunner implements Serializable {
    *
    */
   public String explainQuery(String sqlString)
-      throws ValidationException, RelConversionException, SqlParseException {
+    throws ValidationException, RelConversionException, SqlParseException {
     BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
     String beamPlan = RelOptUtil.toString(exeTree);
     System.out.println(String.format("beamPlan>\n%s", beamPlan));
-
-    planner.planner.close();
     return beamPlan;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
deleted file mode 100644
index 7cb5243..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-/**
- * Generic exception for un-supported operations.
- *
- */
-public class BeamSqlUnsupportedException extends RuntimeException {
-  /**
-   *
-   */
-  private static final long serialVersionUID = 3445015747629217342L;
-
-  public BeamSqlUnsupportedException(String string) {
-    super(string);
-  }
-
-  public BeamSqlUnsupportedException() {
-    super();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 10dd1be..477be5a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.rel;
 
 import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
 import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -58,7 +58,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
 
     PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
 
-    BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
+    BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
     PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
         ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index dd731f8..7e27ab3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.rel;
 import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
 import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
@@ -69,7 +69,7 @@ public class BeamProjectRel extends Project implements BeamRelNode {
 
     PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
 
-    BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
+    BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
     PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
         .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 81829e9..2ecfa38 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -37,11 +37,6 @@ import org.apache.calcite.schema.Statistics;
  * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
  */
 public abstract class BaseBeamTable implements ScannableTable, Serializable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1262988061830914193L;
   private RelDataType relDataType;
 
   protected BeamSQLRecordType beamSqlRecordType;

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
index 661b155..e4013bc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -30,10 +30,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
  */
 //@DefaultCoder(BeamSQLRecordTypeCoder.class)
 public class BeamSQLRecordType implements Serializable {
-  /**
-   *
-   */
-  private static final long serialVersionUID = -5318734648766104712L;
   private List<String> fieldsName = new ArrayList<>();
   private List<SqlTypeName> fieldsType = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
index ec330f1..b88a195 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
@@ -54,35 +54,34 @@ public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
     for (SqlTypeName fieldType : value.getFieldsType()) {
       stringCoder.encode(fieldType.name(), outStream, nested);
     }
-    outStream.flush();
+    //add a dummy field to indicate the end of record
+    intCoder.encode(value.size(), outStream, context);
   }
 
   @Override
   public BeamSQLRecordType decode(InputStream inStream,
       org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
     BeamSQLRecordType typeRecord = new BeamSQLRecordType();
-    Context nested = context.nested();
-    int size = intCoder.decode(inStream, nested);
+    int size = intCoder.decode(inStream, context.nested());
     for (int idx = 0; idx < size; ++idx) {
-      typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested));
+      typeRecord.getFieldsName().add(stringCoder.decode(inStream, context.nested()));
     }
     for (int idx = 0; idx < size; ++idx) {
-      typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested)));
+      typeRecord.getFieldsType().add(
+          SqlTypeName.valueOf(stringCoder.decode(inStream, context.nested())));
     }
+    intCoder.decode(inStream, context);
     return typeRecord;
   }
 
   @Override
   public List<? extends Coder<?>> getCoderArguments() {
-    // TODO Auto-generated method stub
     return null;
   }
 
   @Override
   public void verifyDeterministic()
       throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-    // TODO Auto-generated method stub
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index b65e23b..f9dab8a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import org.apache.beam.dsls.sql.exception.InvalidFieldException;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -28,10 +29,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
  *
  */
 public class BeamSQLRow implements Serializable {
-  /**
-   *
-   */
-  private static final long serialVersionUID = 4569220242480160895L;
 
   private List<Integer> nullFields = new ArrayList<>();
   private List<Object> dataValues;
@@ -42,12 +39,15 @@ public class BeamSQLRow implements Serializable {
     this.dataValues = new ArrayList<>();
     for (int idx = 0; idx < dataType.size(); ++idx) {
       dataValues.add(null);
+      nullFields.add(idx);
     }
   }
 
   public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) {
-    this.dataValues = dataValues;
-    this.dataType = dataType;
+    this(dataType);
+    for (int idx = 0; idx < dataValues.size(); ++idx) {
+      addField(idx, dataValues.get(idx));
+    }
   }
 
   public void addField(String fieldName, Object fieldValue) {
@@ -56,19 +56,29 @@ public class BeamSQLRow implements Serializable {
 
   public void addField(int index, Object fieldValue) {
     if (fieldValue == null) {
-      dataValues.set(index, fieldValue);
-      if (!nullFields.contains(index)) {
-        nullFields.add(index);
-      }
       return;
+    } else {
+      if (nullFields.contains(index)) {
+        nullFields.remove(nullFields.indexOf(index));
+      }
     }
 
     SqlTypeName fieldType = dataType.getFieldsType().get(index);
     switch (fieldType) {
     case INTEGER:
+      if (!(fieldValue instanceof Integer)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      }
+      break;
     case SMALLINT:
+      if (!(fieldValue instanceof Short)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      }
+      break;
     case TINYINT:
-      if (!(fieldValue instanceof Integer)) {
+      if (!(fieldValue instanceof Byte)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       }
@@ -97,24 +107,24 @@ public class BeamSQLRow implements Serializable {
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       }
       break;
-    case TIME:
-    case TIMESTAMP:
-      if (!(fieldValue instanceof Date)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
     default:
       throw new UnsupportedDataTypeException(fieldType);
     }
     dataValues.set(index, fieldValue);
   }
 
+  public short getShort(int idx) {
+    return (Short) getFieldValue(idx);
+  }
 
   public int getInteger(int idx) {
     return (Integer) getFieldValue(idx);
   }
 
+  public float getFloat(int idx) {
+    return (Float) getFieldValue(idx);
+  }
+
   public double getDouble(int idx) {
     return (Double) getFieldValue(idx);
   }
@@ -145,48 +155,52 @@ public class BeamSQLRow implements Serializable {
 
     switch (fieldType) {
     case INTEGER:
+      if (!(fieldValue instanceof Integer)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      } else {
+        return fieldValue;
+      }
     case SMALLINT:
+      if (!(fieldValue instanceof Short)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      } else {
+        return fieldValue;
+      }
     case TINYINT:
-      if (!(fieldValue instanceof Integer)) {
+      if (!(fieldValue instanceof Byte)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       } else {
-        return Integer.valueOf(fieldValue.toString());
+        return fieldValue;
       }
     case DOUBLE:
       if (!(fieldValue instanceof Double)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       } else {
-        return Double.valueOf(fieldValue.toString());
+        return fieldValue;
       }
     case BIGINT:
       if (!(fieldValue instanceof Long)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       } else {
-        return Long.valueOf(fieldValue.toString());
+        return fieldValue;
       }
     case FLOAT:
       if (!(fieldValue instanceof Float)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       } else {
-        return Float.valueOf(fieldValue.toString());
+        return fieldValue;
       }
     case VARCHAR:
       if (!(fieldValue instanceof String)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       } else {
-        return fieldValue.toString();
-      }
-    case TIME:
-    case TIMESTAMP:
-      if (!(fieldValue instanceof Date)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
         return fieldValue;
       }
     default: