You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:58 UTC

[35/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
new file mode 100644
index 0000000..fc137da
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '%' operator.
+ */
+public class BeamSqlModExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlModExpression(List<BeamSqlExpression> operands) {
+    super(operands, operands.get(1).getOutputType());
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return BigDecimal.valueOf(left.doubleValue() % right.doubleValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
new file mode 100644
index 0000000..7ea974c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '*' operator.
+ */
+public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlMultiplyExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.multiply(right);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
new file mode 100644
index 0000000..3ce806f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '+' operator.
+ */
+public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlPlusExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.add(right);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
new file mode 100644
index 0000000..5f8d649
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Arithmetic operators.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
new file mode 100644
index 0000000..9b6b527
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlCompareExpression} is used for compare operations.
+ *
+ * <p>See {@link BeamSqlEqualsExpression}, {@link BeamSqlLessThanExpression},
+ * {@link BeamSqlLessThanOrEqualsExpression}, {@link BeamSqlGreaterThanExpression},
+ * {@link BeamSqlGreaterThanOrEqualsExpression} and {@link BeamSqlNotEqualsExpression}
+ * for more details.
+ *
+ */
+public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
+
+  private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * Compare operation must have 2 operands.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 2;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
+    switch (operands.get(0).getOutputType()) {
+    case BIGINT:
+    case DECIMAL:
+    case DOUBLE:
+    case FLOAT:
+    case INTEGER:
+    case SMALLINT:
+    case TINYINT:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Number) leftValue, (Number) rightValue));
+    case BOOLEAN:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Boolean) leftValue, (Boolean) rightValue));
+    case VARCHAR:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((CharSequence) leftValue, (CharSequence) rightValue));
+    default:
+      throw new UnsupportedOperationException(toString());
+    }
+  }
+
+  /**
+   * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
+   */
+  public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
+
+  /**
+   * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
+   */
+  public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
+
+  /**
+   * Compare between Number values, including {@link SqlTypeName#BIGINT},
+   * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
+   * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
+   */
+  public abstract Boolean compare(Number leftValue, Number rightValue);
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
new file mode 100644
index 0000000..b9767e3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code =} operation.
+ */
+public class BeamSqlEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return !(leftValue ^ rightValue);
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() == (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
new file mode 100644
index 0000000..5fdf27b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >} operation.
+ */
+public class BeamSqlGreaterThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlGreaterThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("> is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() > (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
new file mode 100644
index 0000000..ae22054
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >=} operation.
+ */
+public class BeamSqlGreaterThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlGreaterThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException(">= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() >= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
new file mode 100644
index 0000000..78660cb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NOT NULL' operation.
+ */
+public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNotNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
new file mode 100644
index 0000000..013d8d7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NULL' operation.
+ */
+public class BeamSqlIsNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
new file mode 100644
index 0000000..a6e5cd9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <} operation.
+ */
+public class BeamSqlLessThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("< is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() < (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
new file mode 100644
index 0000000..52a604c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <=} operation.
+ */
+public class BeamSqlLessThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("<= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() <= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
new file mode 100644
index 0000000..1c5b072
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <>} operation.
+ */
+public class BeamSqlNotEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlNotEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return leftValue ^ rightValue;
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() != (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java
new file mode 100644
index 0000000..94ed727
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Comparison operators.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
new file mode 100644
index 0000000..e3d6cc8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Collections;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CURRENT_DATE and LOCALTIME.
+ *
+ * <p>Returns the current date in the session time zone, in a value of datatype DATE.
+ */
+public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
+  public BeamSqlCurrentDateExpression() {
+    super(Collections.<BeamSqlExpression>emptyList(), SqlTypeName.DATE);
+  }
+  @Override public boolean accept() {
+    return getOperands().size() == 0;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
new file mode 100644
index 0000000..edabe53
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIME and CURRENT_TIME.
+ *
+ * <p>Returns the current date and time in the session time zone in a value of datatype TIME, with
+ * precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
+  public BeamSqlCurrentTimeExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIME);
+  }
+  @Override public boolean accept() {
+    int opCount = getOperands().size();
+    return opCount <= 1;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
+    ret.setTime(new Date());
+    return BeamSqlPrimitive.of(outputType, ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
new file mode 100644
index 0000000..73174b3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIMESTAMP and CURRENT_TIMESTAMP.
+ *
+ * <p>Returns the current date and time in the session time zone in a value of datatype TIMESTAMP,
+ * with precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
+  public BeamSqlCurrentTimestampExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIMESTAMP);
+  }
+  @Override public boolean accept() {
+    int opCount = getOperands().size();
+    return opCount <= 1;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
new file mode 100644
index 0000000..e575d6e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CEIL(date).
+ *
+ * <p>NOTE: only support CEIL for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateCeilExpression extends BeamSqlExpression {
+  public BeamSqlDateCeilExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIMESTAMP);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.SYMBOL;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
+    long time = date.getTime();
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+    long newTime = DateTimeUtils.unixTimestampCeil(unit, time);
+    Date newDate = new Date(newTime);
+
+    return BeamSqlPrimitive.of(outputType, newDate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
new file mode 100644
index 0000000..4bad353
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for FLOOR(date).
+ *
+ * <p>NOTE: only support FLOOR for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateFloorExpression extends BeamSqlExpression {
+  public BeamSqlDateFloorExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DATE);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.SYMBOL;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
+    long time = date.getTime();
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+    long newTime = DateTimeUtils.unixTimestampFloor(unit, time);
+    Date newDate = new Date(newTime);
+
+    return BeamSqlPrimitive.of(outputType, newDate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java
new file mode 100644
index 0000000..a7f3071
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for EXTRACT.
+ *
+ * <p>The following date functions also implicitly converted to {@code EXTRACT}:
+ * <ul>
+ *   <li>YEAR(date) =&gt; EXTRACT(YEAR FROM date)</li>
+ *   <li>MONTH(date) =&gt; EXTRACT(MONTH FROM date)</li>
+ *   <li>DAY(date) =&gt; EXTRACT(DAY FROM date)</li>
+ *   <li>QUARTER(date) =&gt; EXTRACT(QUARTER FROM date)</li>
+ *   <li>WEEK(date) =&gt; EXTRACT(WEEK FROM date)</li>
+ *   <li>DAYOFYEAR(date) =&gt; EXTRACT(DOY FROM date)</li>
+ *   <li>DAYOFMONTH(date) =&gt; EXTRACT(DAY FROM date)</li>
+ *   <li>DAYOFWEEK(date) =&gt; EXTRACT(DOW FROM date)</li>
+ * </ul>
+ */
+public class BeamSqlExtractExpression extends BeamSqlExpression {
+  private static final Map<TimeUnitRange, Integer> typeMapping = new HashMap<>();
+  static {
+    typeMapping.put(TimeUnitRange.DOW, Calendar.DAY_OF_WEEK);
+    typeMapping.put(TimeUnitRange.DOY, Calendar.DAY_OF_YEAR);
+    typeMapping.put(TimeUnitRange.WEEK, Calendar.WEEK_OF_YEAR);
+  }
+
+  public BeamSqlExtractExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.BIGINT);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.BIGINT;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Long time = opValueEvaluated(1, inputRow);
+
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
+
+    switch (unit) {
+      case YEAR:
+      case MONTH:
+      case DAY:
+        Long timeByDay = time / 1000 / 3600 / 24;
+        Long extracted = DateTimeUtils.unixDateExtract(
+            unit,
+            timeByDay
+        );
+        return BeamSqlPrimitive.of(outputType, extracted);
+
+      case DOY:
+      case DOW:
+      case WEEK:
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date(time));
+        return BeamSqlPrimitive.of(outputType, (long) calendar.get(typeMapping.get(unit)));
+
+      case QUARTER:
+        calendar = Calendar.getInstance();
+        calendar.setTime(new Date(time));
+        long ret = calendar.get(Calendar.MONTH) / 3;
+        if (ret * 3 < calendar.get(Calendar.MONTH)) {
+          ret += 1;
+        }
+        return BeamSqlPrimitive.of(outputType, ret);
+
+      default:
+        throw new UnsupportedOperationException(
+            "Extract for time unit: " + unit + " not supported!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java
new file mode 100644
index 0000000..1ccd9d6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * date functions.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java
new file mode 100644
index 0000000..eca945b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'AND' operation.
+ */
+public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
+  public BeamSqlAndExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    boolean result = true;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+      result = result && expOut.getValue();
+      if (!result) {
+        break;
+      }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
new file mode 100644
index 0000000..3d2e050
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for Logical operators.
+ */
+public abstract class BeamSqlLogicalExpression extends BeamSqlExpression {
+  private BeamSqlLogicalExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+  public BeamSqlLogicalExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  @Override
+  public boolean accept() {
+    for (BeamSqlExpression exp : operands) {
+      // only accept BOOLEAN expression as operand
+      if (!exp.getOutputType().equals(SqlTypeName.BOOLEAN)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java
new file mode 100644
index 0000000..521b340
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for logical operator: NOT.
+ *
+ * <p>Whether boolean is not TRUE; returns UNKNOWN if boolean is UNKNOWN.
+ */
+public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
+  public BeamSqlNotExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public boolean accept() {
+    if (numberOfOperands() != 1) {
+      return false;
+    }
+    return super.accept();
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Boolean value = opValueEvaluated(0, inputRow);
+    if (value == null) {
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
+    } else {
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, !value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java
new file mode 100644
index 0000000..a9d8e8a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'OR' operation.
+ */
+public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
+  public BeamSqlOrExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    boolean result = false;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+        result = result || expOut.getValue();
+        if (result) {
+          break;
+        }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java
new file mode 100644
index 0000000..b7ef1ba
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Logical operators.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java
new file mode 100644
index 0000000..0a68563
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ABS' function.
+ */
+public class BeamSqlAbsExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAbsExpression(List<BeamSqlExpression> operands) {
+    super(operands, operands.get(0).getOutputType());
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    BeamSqlPrimitive result = null;
+    switch (op.getOutputType()) {
+      case INTEGER:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, SqlFunctions.abs(op.getInteger()));
+        break;
+      case BIGINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.abs(op.getLong()));
+        break;
+      case TINYINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.TINYINT, SqlFunctions.abs(op.getByte()));
+        break;
+      case SMALLINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.SMALLINT, SqlFunctions.abs(op.getShort()));
+        break;
+      case FLOAT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.FLOAT, SqlFunctions.abs(op.getFloat()));
+        break;
+      case DECIMAL:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DECIMAL, SqlFunctions.abs(new BigDecimal(op.getValue().toString())));
+        break;
+      case DOUBLE:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.abs(op.getDouble()));
+        break;
+      default:
+        break;
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java
new file mode 100644
index 0000000..a49d72a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ACOS' function.
+ */
+public class BeamSqlAcosExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAcosExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.acos(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java
new file mode 100644
index 0000000..557ec8d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ASIN' function.
+ */
+public class BeamSqlAsinExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAsinExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.asin(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
new file mode 100644
index 0000000..4e11b42
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlMathBinaryExpression} for 'ATAN2' function.
+ */
+public class BeamSqlAtan2Expression extends BeamSqlMathBinaryExpression {
+
+  public BeamSqlAtan2Expression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+      BeamSqlPrimitive rightOp) {
+    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions
+        .atan2(SqlFunctions.toDouble(leftOp.getValue()),
+            SqlFunctions.toDouble(rightOp.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java
new file mode 100644
index 0000000..0991252
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ATAN' function.
+ */
+public class BeamSqlAtanExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAtanExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.atan(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java
new file mode 100644
index 0000000..a3cb9c8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'CEIL' function.
+ */
+public class BeamSqlCeilExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlCeilExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    switch (getOutputType()) {
+      case DECIMAL:
+        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, SqlFunctions.ceil(op.getDecimal()));
+      default:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.ceil(SqlFunctions.toDouble(op.getValue())));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java
new file mode 100644
index 0000000..6ddd079
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'COS' function.
+ */
+public class BeamSqlCosExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlCosExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.cos(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java
new file mode 100644
index 0000000..9dfbd90
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'COT' function.
+ */
+public class BeamSqlCotExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlCotExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.cot(SqlFunctions.toDouble(op.getValue())));
+  }
+}