You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:16 UTC

[53/59] beam git commit: move all implementation classes/packages into impl package

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
new file mode 100644
index 0000000..b80e045
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.utils;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility methods for Calcite related operations.
+ */
+public class CalciteUtils {
+  private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>();
+  private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>();
+  static {
+    JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER);
+    JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
+    JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE);
+    JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
+    JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN);
+
+    for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) {
+      CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey());
+    }
+  }
+
+  /**
+   * Get the corresponding {@code SqlTypeName} for an integer sql type.
+   */
+  public static SqlTypeName toCalciteType(int type) {
+    return JAVA_TO_CALCITE_MAPPING.get(type);
+  }
+
+  /**
+   * Get the integer sql type from Calcite {@code SqlTypeName}.
+   */
+  public static Integer toJavaType(SqlTypeName typeName) {
+    return CALCITE_TO_JAVA_MAPPING.get(typeName);
+  }
+
+  /**
+   * Get the {@code SqlTypeName} for the specified column of a table.
+   */
+  public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
+    return toCalciteType(schema.getFieldsType().get(index));
+  }
+
+  /**
+   * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
+   */
+  public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
+    List<String> fieldNames = new ArrayList<>();
+    List<Integer> fieldTypes = new ArrayList<>();
+    for (RelDataTypeField f : tableInfo.getFieldList()) {
+      fieldNames.add(f.getName());
+      fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
+    }
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
+  }
+
+  /**
+   * Create an instance of {@code RelDataType} so it can be used to create a table.
+   */
+  public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
+    return new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a) {
+        RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
+        for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
+          builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
+        }
+        return builder.build();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java
new file mode 100644
index 0000000..b00ed0c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.utils;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
deleted file mode 100644
index 28f83e4..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-
-/**
- * {@code BeamSqlExpressionExecutor} fills the gap between relational
- * expressions in Calcite SQL and executable code.
- *
- */
-public interface BeamSqlExpressionExecutor extends Serializable {
-
-  /**
-   * invoked before data processing.
-   */
-  void prepare();
-
-  /**
-   * apply transformation to input record {@link BeamSqlRow}.
-   *
-   */
-  List<Object> execute(BeamSqlRow inputRow);
-
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
deleted file mode 100644
index 3084cd5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCastExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlReinterpretExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlUdfExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowEndExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowStartExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAbsExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAcosExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAsinExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtan2Expression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtanExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCeilExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCosExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCotExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlDegreesExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlExpExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlFloorExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLnExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLogExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPiExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPowerExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRadiansExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandIntegerExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRoundExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSignExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSinExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTanExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTruncateExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression;
-import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
-import org.apache.calcite.util.NlsString;
-
-/**
- * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
- * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
- * which can be evaluated against the {@link BeamSqlRow}.
- *
- */
-public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
-  protected List<BeamSqlExpression> exps;
-
-  public BeamSqlFnExecutor(BeamRelNode relNode) {
-    this.exps = new ArrayList<>();
-    if (relNode instanceof BeamFilterRel) {
-      BeamFilterRel filterNode = (BeamFilterRel) relNode;
-      RexNode condition = filterNode.getCondition();
-      exps.add(buildExpression(condition));
-    } else if (relNode instanceof BeamProjectRel) {
-      BeamProjectRel projectNode = (BeamProjectRel) relNode;
-      List<RexNode> projects = projectNode.getProjects();
-      for (RexNode rexNode : projects) {
-        exps.add(buildExpression(rexNode));
-      }
-    } else {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported yet!", relNode.getClass().toString()));
-    }
-  }
-
-  /**
-   * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
-   * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
-   */
-  static BeamSqlExpression buildExpression(RexNode rexNode) {
-    BeamSqlExpression ret = null;
-    if (rexNode instanceof RexLiteral) {
-      RexLiteral node = (RexLiteral) rexNode;
-      SqlTypeName type = node.getTypeName();
-      Object value = node.getValue();
-
-      if (SqlTypeName.CHAR_TYPES.contains(type)
-          && node.getValue() instanceof NlsString) {
-        // NlsString is not serializable, we need to convert
-        // it to string explicitly.
-        return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
-      } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
-        // does this actually make sense?
-        // Calcite actually treat Calendar as the java type of Date Literal
-        return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
-      } else {
-        // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
-        // e.g. sql: "select 1"
-        // here the literal 1 will be parsed as a RexLiteral where:
-        //     node.getType().getSqlTypeName() = INTEGER (the display type)
-        //     node.getSqlTypeName() = DECIMAL (the actual internal storage format)
-        // So we need to do a convert here.
-        // check RexBuilder#makeLiteral for more information.
-        SqlTypeName realType = node.getType().getSqlTypeName();
-        Object realValue = value;
-        if (type == SqlTypeName.DECIMAL) {
-          BigDecimal rawValue = (BigDecimal) value;
-          switch (realType) {
-            case TINYINT:
-              realValue = (byte) rawValue.intValue();
-              break;
-            case SMALLINT:
-              realValue = (short) rawValue.intValue();
-              break;
-            case INTEGER:
-              realValue = rawValue.intValue();
-              break;
-            case BIGINT:
-              realValue = rawValue.longValue();
-              break;
-            case DECIMAL:
-              realValue = rawValue;
-              break;
-            default:
-              throw new IllegalStateException("type/realType mismatch: "
-                  + type + " VS " + realType);
-          }
-        } else if (type == SqlTypeName.DOUBLE) {
-          Double rawValue = (Double) value;
-          if (realType == SqlTypeName.FLOAT) {
-            realValue = rawValue.floatValue();
-          }
-        }
-        return BeamSqlPrimitive.of(realType, realValue);
-      }
-    } else if (rexNode instanceof RexInputRef) {
-      RexInputRef node = (RexInputRef) rexNode;
-      ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
-    } else if (rexNode instanceof RexCall) {
-      RexCall node = (RexCall) rexNode;
-      String opName = node.op.getName();
-      List<BeamSqlExpression> subExps = new ArrayList<>();
-      for (RexNode subNode : node.getOperands()) {
-        subExps.add(buildExpression(subNode));
-      }
-      switch (opName) {
-        // logical operators
-        case "AND":
-          ret = new BeamSqlAndExpression(subExps);
-          break;
-        case "OR":
-          ret = new BeamSqlOrExpression(subExps);
-          break;
-        case "NOT":
-          ret = new BeamSqlNotExpression(subExps);
-          break;
-        case "=":
-          ret = new BeamSqlEqualsExpression(subExps);
-          break;
-        case "<>":
-          ret = new BeamSqlNotEqualsExpression(subExps);
-          break;
-        case ">":
-          ret = new BeamSqlGreaterThanExpression(subExps);
-          break;
-        case ">=":
-          ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
-          break;
-        case "<":
-          ret = new BeamSqlLessThanExpression(subExps);
-          break;
-        case "<=":
-          ret = new BeamSqlLessThanOrEqualsExpression(subExps);
-          break;
-
-        // arithmetic operators
-        case "+":
-          ret = new BeamSqlPlusExpression(subExps);
-          break;
-        case "-":
-          ret = new BeamSqlMinusExpression(subExps);
-          break;
-        case "*":
-          ret = new BeamSqlMultiplyExpression(subExps);
-          break;
-        case "/":
-        case "/INT":
-          ret = new BeamSqlDivideExpression(subExps);
-          break;
-        case "MOD":
-          ret = new BeamSqlModExpression(subExps);
-          break;
-
-        case "ABS":
-          ret = new BeamSqlAbsExpression(subExps);
-          break;
-        case "ROUND":
-          ret = new BeamSqlRoundExpression(subExps);
-          break;
-        case "LN":
-          ret = new BeamSqlLnExpression(subExps);
-          break;
-        case "LOG10":
-          ret = new BeamSqlLogExpression(subExps);
-          break;
-        case "EXP":
-          ret = new BeamSqlExpExpression(subExps);
-          break;
-        case "ACOS":
-          ret = new BeamSqlAcosExpression(subExps);
-          break;
-        case "ASIN":
-          ret = new BeamSqlAsinExpression(subExps);
-          break;
-        case "ATAN":
-          ret = new BeamSqlAtanExpression(subExps);
-          break;
-        case "COT":
-          ret = new BeamSqlCotExpression(subExps);
-          break;
-        case "DEGREES":
-          ret = new BeamSqlDegreesExpression(subExps);
-          break;
-        case "RADIANS":
-          ret = new BeamSqlRadiansExpression(subExps);
-          break;
-        case "COS":
-          ret = new BeamSqlCosExpression(subExps);
-          break;
-        case "SIN":
-          ret = new BeamSqlSinExpression(subExps);
-          break;
-        case "TAN":
-          ret = new BeamSqlTanExpression(subExps);
-          break;
-        case "SIGN":
-          ret = new BeamSqlSignExpression(subExps);
-          break;
-        case "POWER":
-          ret = new BeamSqlPowerExpression(subExps);
-          break;
-        case "PI":
-          ret = new BeamSqlPiExpression();
-          break;
-        case "ATAN2":
-          ret = new BeamSqlAtan2Expression(subExps);
-          break;
-        case "TRUNCATE":
-          ret = new BeamSqlTruncateExpression(subExps);
-          break;
-        case "RAND":
-          ret = new BeamSqlRandExpression(subExps);
-          break;
-        case "RAND_INTEGER":
-          ret = new BeamSqlRandIntegerExpression(subExps);
-          break;
-
-        // string operators
-        case "||":
-          ret = new BeamSqlConcatExpression(subExps);
-          break;
-        case "POSITION":
-          ret = new BeamSqlPositionExpression(subExps);
-          break;
-        case "CHAR_LENGTH":
-        case "CHARACTER_LENGTH":
-          ret = new BeamSqlCharLengthExpression(subExps);
-          break;
-        case "UPPER":
-          ret = new BeamSqlUpperExpression(subExps);
-          break;
-        case "LOWER":
-          ret = new BeamSqlLowerExpression(subExps);
-          break;
-        case "TRIM":
-          ret = new BeamSqlTrimExpression(subExps);
-          break;
-        case "SUBSTRING":
-          ret = new BeamSqlSubstringExpression(subExps);
-          break;
-        case "OVERLAY":
-          ret = new BeamSqlOverlayExpression(subExps);
-          break;
-        case "INITCAP":
-          ret = new BeamSqlInitCapExpression(subExps);
-          break;
-
-        // date functions
-        case "Reinterpret":
-          return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
-        case "CEIL":
-          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
-            return new BeamSqlCeilExpression(subExps);
-          } else {
-            return new BeamSqlDateCeilExpression(subExps);
-          }
-        case "FLOOR":
-          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
-            return new BeamSqlFloorExpression(subExps);
-          } else {
-            return new BeamSqlDateFloorExpression(subExps);
-          }
-        case "EXTRACT_DATE":
-        case "EXTRACT":
-          return new BeamSqlExtractExpression(subExps);
-
-        case "LOCALTIME":
-        case "CURRENT_TIME":
-          return new BeamSqlCurrentTimeExpression(subExps);
-
-        case "CURRENT_TIMESTAMP":
-        case "LOCALTIMESTAMP":
-          return new BeamSqlCurrentTimestampExpression(subExps);
-
-        case "CURRENT_DATE":
-          return new BeamSqlCurrentDateExpression();
-
-
-        case "CASE":
-          ret = new BeamSqlCaseExpression(subExps);
-          break;
-        case "CAST":
-          ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
-          break;
-
-        case "IS NULL":
-          ret = new BeamSqlIsNullExpression(subExps.get(0));
-          break;
-        case "IS NOT NULL":
-          ret = new BeamSqlIsNotNullExpression(subExps.get(0));
-          break;
-
-        case "HOP":
-        case "TUMBLE":
-        case "SESSION":
-          ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
-          break;
-        case "HOP_START":
-        case "TUMBLE_START":
-        case "SESSION_START":
-          ret = new BeamSqlWindowStartExpression();
-          break;
-        case "HOP_END":
-        case "TUMBLE_END":
-        case "SESSION_END":
-          ret = new BeamSqlWindowEndExpression();
-          break;
-        default:
-          //handle UDF
-          if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
-            SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
-            ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
-            ret = new BeamSqlUdfExpression(fn.method, subExps,
-              ((RexCall) rexNode).type.getSqlTypeName());
-        } else {
-          throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
-        }
-      }
-    } else {
-      throw new UnsupportedOperationException(
-          String.format("%s is not supported yet!", rexNode.getClass().toString()));
-    }
-
-    if (ret != null && !ret.accept()) {
-      throw new IllegalStateException(ret.getClass().getSimpleName()
-          + " does not accept the operands.(" + rexNode + ")");
-    }
-
-    return ret;
-  }
-
-  @Override
-  public void prepare() {
-  }
-
-  @Override
-  public List<Object> execute(BeamSqlRow inputRow) {
-    List<Object> results = new ArrayList<>();
-    for (BeamSqlExpression exp : exps) {
-      results.add(exp.evaluate(inputRow).getValue());
-    }
-    return results;
-  }
-
-  @Override
-  public void close() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
deleted file mode 100644
index bfbb33e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- *  {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL.
- */
-public class BeamSqlCaseExpression extends BeamSqlExpression {
-  public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
-    // the return type of CASE is the type of the `else` condition
-    super(operands, operands.get(operands.size() - 1).getOutputType());
-  }
-
-  @Override public boolean accept() {
-    // `when`-`then` pair + `else`
-    if (operands.size() % 2 != 1) {
-      return false;
-    }
-
-    for (int i = 0; i < operands.size() - 1; i += 2) {
-      if (opType(i) != SqlTypeName.BOOLEAN) {
-        return false;
-      } else if (opType(i + 1) != outputType) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    for (int i = 0; i < operands.size() - 1; i += 2) {
-      if (opValueEvaluated(i, inputRow)) {
-        return BeamSqlPrimitive.of(
-            outputType,
-            opValueEvaluated(i + 1, inputRow)
-        );
-      }
-    }
-    return BeamSqlPrimitive.of(outputType,
-        opValueEvaluated(operands.size() - 1, inputRow));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
deleted file mode 100644
index 08abcc6..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.DateTimeFormatterBuilder;
-import org.joda.time.format.DateTimeParser;
-
-/**
- * Base class to support 'CAST' operations for all {@link SqlTypeName}.
- */
-public class BeamSqlCastExpression extends BeamSqlExpression {
-
-  private static final int index = 0;
-  private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
-  private static final String outputDateFormat = "yyyy-MM-dd";
-  /**
-   * Date and Timestamp formats used to parse
-   * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
-   */
-  private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
-      .append(null/*printer*/, new DateTimeParser[] {
-          // date formats
-          DateTimeFormat.forPattern("yy-MM-dd").getParser(),
-          DateTimeFormat.forPattern("yy/MM/dd").getParser(),
-          DateTimeFormat.forPattern("yy.MM.dd").getParser(),
-          DateTimeFormat.forPattern("yyMMdd").getParser(),
-          DateTimeFormat.forPattern("yyyyMMdd").getParser(),
-          DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
-          DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
-          DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
-          // datetime formats
-          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
-          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
-          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
-          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
-          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
-          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
-      .withPivotYear(2020);
-
-  public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
-    super(operands, castType);
-  }
-
-  @Override
-  public boolean accept() {
-    return numberOfOperands() == 1;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    SqlTypeName castOutputType = getOutputType();
-    switch (castOutputType) {
-      case INTEGER:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
-      case DOUBLE:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
-      case SMALLINT:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
-      case TINYINT:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
-      case BIGINT:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
-      case DECIMAL:
-        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
-            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
-      case FLOAT:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
-      case CHAR:
-      case VARCHAR:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
-      case DATE:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
-      case TIMESTAMP:
-        return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-            toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
-    }
-    throw new UnsupportedOperationException(
-        String.format("Cast to type %s not supported", castOutputType));
-  }
-
-  private Date toDate(Object inputDate, String outputFormat) {
-    try {
-      return Date
-          .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
-    } catch (IllegalArgumentException | UnsupportedOperationException e) {
-      throw new UnsupportedOperationException("Can't be cast to type 'Date'");
-    }
-  }
-
-  private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
-    try {
-      return Timestamp.valueOf(
-          dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
-              .roundCeilingCopy().toString(outputFormat));
-    } catch (IllegalArgumentException | UnsupportedOperationException e) {
-      throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
deleted file mode 100644
index cb8baac..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
- *
- * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
- * as its operands, and return a value with type {@link SqlTypeName}.
- *
- */
-public abstract class BeamSqlExpression implements Serializable {
-  protected List<BeamSqlExpression> operands;
-  protected SqlTypeName outputType;
-
-  protected BeamSqlExpression(){}
-
-  public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    this.operands = operands;
-    this.outputType = outputType;
-  }
-
-  public BeamSqlExpression op(int idx) {
-    return operands.get(idx);
-  }
-
-  public SqlTypeName opType(int idx) {
-    return op(idx).getOutputType();
-  }
-
-  public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
-    return (T) op(idx).evaluate(row).getValue();
-  }
-
-  /**
-   * assertion to make sure the input and output are supported in this expression.
-   */
-  public abstract boolean accept();
-
-  /**
-   * Apply input record {@link BeamSqlRow} to this expression,
-   * the output value is wrapped with {@link BeamSqlPrimitive}.
-   */
-  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
-
-  public List<BeamSqlExpression> getOperands() {
-    return operands;
-  }
-
-  public SqlTypeName getOutputType() {
-    return outputType;
-  }
-
-  public int numberOfOperands() {
-    return operands.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
deleted file mode 100644
index 7ba4a46..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * An primitive operation for direct field extraction.
- */
-public class BeamSqlInputRefExpression extends BeamSqlExpression {
-  private int inputRef;
-
-  public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
-    super(null, sqlTypeName);
-    this.inputRef = inputRef;
-  }
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
deleted file mode 100644
index 6a8216b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.NlsString;
-
-/**
- * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
- * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
- *
- */
-public class BeamSqlPrimitive<T> extends BeamSqlExpression {
-  private T value;
-
-  private BeamSqlPrimitive() {
-  }
-
-  private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  /**
-   * A builder function to create from Type and value directly.
-   */
-  public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
-    BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
-    exp.outputType = outputType;
-    exp.value = value;
-    if (!exp.accept()) {
-      throw new IllegalArgumentException(
-          String.format("value [%s] doesn't match type [%s].", value, outputType));
-    }
-    return exp;
-  }
-
-  public SqlTypeName getOutputType() {
-    return outputType;
-  }
-
-  public T getValue() {
-    return value;
-  }
-
-  public long getLong() {
-    return (Long) getValue();
-  }
-
-  public double getDouble() {
-    return (Double) getValue();
-  }
-
-  public float getFloat() {
-    return (Float) getValue();
-  }
-
-  public int getInteger() {
-    return (Integer) getValue();
-  }
-
-  public short getShort() {
-    return (Short) getValue();
-  }
-
-  public byte getByte() {
-    return (Byte) getValue();
-  }
-  public boolean getBoolean() {
-    return (Boolean) getValue();
-  }
-
-  public String getString() {
-    return (String) getValue();
-  }
-
-  public Date getDate() {
-    return (Date) getValue();
-  }
-
-  public BigDecimal getDecimal() {
-    return (BigDecimal) getValue();
-  }
-
-  @Override
-  public boolean accept() {
-    if (value == null) {
-      return true;
-    }
-
-    switch (outputType) {
-    case BIGINT:
-      return value instanceof Long;
-    case DECIMAL:
-      return value instanceof BigDecimal;
-    case DOUBLE:
-      return value instanceof Double;
-    case FLOAT:
-      return value instanceof Float;
-    case INTEGER:
-      return value instanceof Integer;
-    case SMALLINT:
-      return value instanceof Short;
-    case TINYINT:
-      return value instanceof Byte;
-    case BOOLEAN:
-      return value instanceof Boolean;
-    case CHAR:
-    case VARCHAR:
-      return value instanceof String || value instanceof NlsString;
-    case TIME:
-      return value instanceof GregorianCalendar;
-    case TIMESTAMP:
-    case DATE:
-      return value instanceof Date;
-    case INTERVAL_HOUR:
-      return value instanceof BigDecimal;
-    case INTERVAL_MINUTE:
-      return value instanceof BigDecimal;
-    case SYMBOL:
-      // for SYMBOL, it supports anything...
-      return true;
-    default:
-      throw new UnsupportedOperationException(outputType.name());
-    }
-  }
-
-  @Override
-  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
-    return this;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
deleted file mode 100644
index 7b4894a..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for REINTERPRET.
- *
- * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
- * to {@code BIGINT} is supported.
- */
-public class BeamSqlReinterpretExpression extends BeamSqlExpression {
-  public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  @Override public boolean accept() {
-    return getOperands().size() == 1
-        && outputType == SqlTypeName.BIGINT
-        && SqlTypeName.DATETIME_TYPES.contains(opType(0));
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    if (opType(0) == SqlTypeName.TIME) {
-      GregorianCalendar date = opValueEvaluated(0, inputRow);
-      return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
-
-    } else {
-      Date date = opValueEvaluated(0, inputRow);
-      return BeamSqlPrimitive.of(outputType, date.getTime());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
deleted file mode 100644
index 42e511d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * invoke a UDF function.
- */
-public class BeamSqlUdfExpression extends BeamSqlExpression {
-  //as Method is not Serializable, need to keep class/method information, and rebuild it.
-  private transient Method method;
-  private String className;
-  private String methodName;
-  private List<String> paraClassName = new ArrayList<>();
-
-  public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
-      SqlTypeName sqlTypeName) {
-    super(subExps, sqlTypeName);
-    this.method = method;
-
-    this.className = method.getDeclaringClass().getName();
-    this.methodName = method.getName();
-    for (Class<?> c : method.getParameterTypes()) {
-      paraClassName.add(c.getName());
-    }
-  }
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    if (method == null) {
-      reConstructMethod();
-    }
-    try {
-      List<Object> paras = new ArrayList<>();
-      for (BeamSqlExpression e : getOperands()) {
-        paras.add(e.evaluate(inputRow).getValue());
-      }
-
-      return BeamSqlPrimitive.of(getOutputType(),
-          method.invoke(null, paras.toArray(new Object[]{})));
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  /**
-   * re-construct method from class/method.
-   */
-  private void reConstructMethod() {
-    try {
-      List<Class<?>> paraClass = new ArrayList<>();
-      for (String pc : paraClassName) {
-        paraClass.add(Class.forName(pc));
-      }
-      method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
deleted file mode 100644
index 76f602c..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
- *
- * <p>These operators returns the <em>end</em> timestamp of window.
- */
-public class BeamSqlWindowEndExpression extends BeamSqlExpression {
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
-    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRow.getWindowEnd().getMillis()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
deleted file mode 100644
index 21ec6dc..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.util.Date;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
- *
- * <p>These functions don't change the timestamp field, instead it's used to indicate
- * the event_timestamp field, and how the window is defined.
- */
-public class BeamSqlWindowExpression extends BeamSqlExpression {
-
-  public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  @Override
-  public boolean accept() {
-    return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
-        || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
-        || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
-  }
-
-  @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
-    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        (Date) operands.get(0).evaluate(inputRow).getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
deleted file mode 100644
index a38fd12..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
-
-import java.util.Date;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
- * {@code SESSION_START} operation.
- *
- * <p>These operators returns the <em>start</em> timestamp of window.
- */
-public class BeamSqlWindowStartExpression extends BeamSqlExpression {
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
-    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRow.getWindowStart().getMillis()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
deleted file mode 100644
index 67a35fc..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Base class for all arithmetic operators.
- */
-public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
-  private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
-  static {
-    ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
-    ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
-    ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
-    ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
-    ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
-    ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
-    ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
-  }
-
-  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
-    super(operands, deduceOutputType(operands.get(0).getOutputType(),
-        operands.get(1).getOutputType()));
-  }
-
-  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
-    BigDecimal left = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(0, inputRow).toString()));
-    BigDecimal right = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(1, inputRow).toString()));
-
-    BigDecimal result = calc(left, right);
-    return getCorrectlyTypedResult(result);
-  }
-
-  protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
-
-  protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
-    int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
-    int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
-    if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
-        && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
-      return SqlTypeName.DOUBLE;
-    }
-
-    if (leftIndex < rightIndex) {
-      return right;
-    } else if (leftIndex > rightIndex) {
-      return left;
-    } else {
-      return left;
-    }
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() != 2) {
-      return false;
-    }
-
-    for (BeamSqlExpression operand : operands) {
-      if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
-    Number actualValue;
-    switch (outputType) {
-      case TINYINT:
-        actualValue = rawResult.byteValue();
-        break;
-      case SMALLINT:
-        actualValue = rawResult.shortValue();
-        break;
-      case INTEGER:
-        actualValue = rawResult.intValue();
-        break;
-      case BIGINT:
-        actualValue = rawResult.longValue();
-        break;
-      case FLOAT:
-        actualValue = rawResult.floatValue();
-        break;
-      case DOUBLE:
-        actualValue = rawResult.doubleValue();
-        break;
-      case DECIMAL:
-      default:
-        actualValue = rawResult;
-    }
-    return BeamSqlPrimitive.of(outputType, actualValue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
deleted file mode 100644
index fbe3fc4..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
-
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-
-/**
- * '/' operator.
- */
-public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
-  public BeamSqlDivideExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
-    return left.divide(right, 10, RoundingMode.HALF_EVEN);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
deleted file mode 100644
index 0241574..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
-
-import java.math.BigDecimal;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-
-/**
- * '-' operator.
- */
-public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression {
-  public BeamSqlMinusExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
-    return left.subtract(right);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
deleted file mode 100644
index fc137da..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
-
-import java.math.BigDecimal;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-
-/**
- * '%' operator.
- */
-public class BeamSqlModExpression extends BeamSqlArithmeticExpression {
-  public BeamSqlModExpression(List<BeamSqlExpression> operands) {
-    super(operands, operands.get(1).getOutputType());
-  }
-
-  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
-    return BigDecimal.valueOf(left.doubleValue() % right.doubleValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
deleted file mode 100644
index 7ea974c..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
-
-import java.math.BigDecimal;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-
-/**
- * '*' operator.
- */
-public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression {
-  public BeamSqlMultiplyExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
-    return left.multiply(right);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
deleted file mode 100644
index 3ce806f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
-
-import java.math.BigDecimal;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-
-/**
- * '+' operator.
- */
-public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression {
-  public BeamSqlPlusExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
-    return left.add(right);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
deleted file mode 100644
index 5f8d649..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Arithmetic operators.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
deleted file mode 100644
index 9b6b527..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@link BeamSqlCompareExpression} is used for compare operations.
- *
- * <p>See {@link BeamSqlEqualsExpression}, {@link BeamSqlLessThanExpression},
- * {@link BeamSqlLessThanOrEqualsExpression}, {@link BeamSqlGreaterThanExpression},
- * {@link BeamSqlGreaterThanOrEqualsExpression} and {@link BeamSqlNotEqualsExpression}
- * for more details.
- *
- */
-public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
-
-  private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
-    this(operands, SqlTypeName.BOOLEAN);
-  }
-
-  /**
-   * Compare operation must have 2 operands.
-   */
-  @Override
-  public boolean accept() {
-    return operands.size() == 2;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
-    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
-    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
-    switch (operands.get(0).getOutputType()) {
-    case BIGINT:
-    case DECIMAL:
-    case DOUBLE:
-    case FLOAT:
-    case INTEGER:
-    case SMALLINT:
-    case TINYINT:
-      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
-          compare((Number) leftValue, (Number) rightValue));
-    case BOOLEAN:
-      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
-          compare((Boolean) leftValue, (Boolean) rightValue));
-    case VARCHAR:
-      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
-          compare((CharSequence) leftValue, (CharSequence) rightValue));
-    default:
-      throw new UnsupportedOperationException(toString());
-    }
-  }
-
-  /**
-   * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
-   */
-  public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
-
-  /**
-   * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
-   */
-  public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
-
-  /**
-   * Compare between Number values, including {@link SqlTypeName#BIGINT},
-   * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
-   * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
-   */
-  public abstract Boolean compare(Number leftValue, Number rightValue);
-
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
deleted file mode 100644
index b9767e3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-
-/**
- * {@code BeamSqlExpression} for {@code =} operation.
- */
-public class BeamSqlEqualsExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlEqualsExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    return !(leftValue ^ rightValue);
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() == (rightValue).floatValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
deleted file mode 100644
index 5fdf27b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-
-/**
- * {@code BeamSqlExpression} for {@code >} operation.
- */
-public class BeamSqlGreaterThanExpression extends BeamSqlCompareExpression {
-
-  public BeamSqlGreaterThanExpression(List<BeamSqlExpression> operands) {
-    super(operands);
-  }
-
-  @Override
-  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
-    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
-  }
-
-  @Override
-  public Boolean compare(Boolean leftValue, Boolean rightValue) {
-    throw new IllegalArgumentException("> is not supported for Boolean.");
-  }
-
-  @Override
-  public Boolean compare(Number leftValue, Number rightValue) {
-    return (leftValue == null && rightValue == null)
-        || (leftValue != null && rightValue != null
-              && leftValue.floatValue() > (rightValue).floatValue());
-  }
-
-}