You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:33 UTC

[10/59] beam git commit: move dsls/sql to sdks/java/extensions/sql

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
new file mode 100644
index 0000000..9d2815c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
+ *
+ * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
+ * as its operands, and return a value with type {@link SqlTypeName}.
+ *
+ */
+public abstract class BeamSqlExpression implements Serializable {
+  protected List<BeamSqlExpression> operands;
+  protected SqlTypeName outputType;
+
+  protected BeamSqlExpression(){}
+
+  public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    this.operands = operands;
+    this.outputType = outputType;
+  }
+
+  public BeamSqlExpression op(int idx) {
+    return operands.get(idx);
+  }
+
+  public SqlTypeName opType(int idx) {
+    return op(idx).getOutputType();
+  }
+
+  public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
+    return (T) op(idx).evaluate(row).getValue();
+  }
+
+  /**
+   * assertion to make sure the input and output are supported in this expression.
+   */
+  public abstract boolean accept();
+
+  /**
+   * Apply input record {@link BeamSqlRow} to this expression,
+   * the output value is wrapped with {@link BeamSqlPrimitive}.
+   */
+  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
+
+  public List<BeamSqlExpression> getOperands() {
+    return operands;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+
+  public int numberOfOperands() {
+    return operands.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
new file mode 100644
index 0000000..710460b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * An primitive operation for direct field extraction.
+ */
+public class BeamSqlInputRefExpression extends BeamSqlExpression {
+  private int inputRef;
+
+  public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
+    super(null, sqlTypeName);
+    this.inputRef = inputRef;
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
new file mode 100644
index 0000000..51724bb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
+ *
+ */
+public class BeamSqlPrimitive<T> extends BeamSqlExpression {
+  private T value;
+
+  private BeamSqlPrimitive() {
+  }
+
+  private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  /**
+   * A builder function to create from Type and value directly.
+   */
+  public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
+    BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
+    exp.outputType = outputType;
+    exp.value = value;
+    if (!exp.accept()) {
+      throw new IllegalArgumentException(
+          String.format("value [%s] doesn't match type [%s].", value, outputType));
+    }
+    return exp;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+
+  public T getValue() {
+    return value;
+  }
+
+  public long getLong() {
+    return (Long) getValue();
+  }
+
+  public double getDouble() {
+    return (Double) getValue();
+  }
+
+  public float getFloat() {
+    return (Float) getValue();
+  }
+
+  public int getInteger() {
+    return (Integer) getValue();
+  }
+
+  public short getShort() {
+    return (Short) getValue();
+  }
+
+  public byte getByte() {
+    return (Byte) getValue();
+  }
+  public boolean getBoolean() {
+    return (Boolean) getValue();
+  }
+
+  public String getString() {
+    return (String) getValue();
+  }
+
+  public Date getDate() {
+    return (Date) getValue();
+  }
+
+  public BigDecimal getDecimal() {
+    return (BigDecimal) getValue();
+  }
+
+  @Override
+  public boolean accept() {
+    if (value == null) {
+      return true;
+    }
+
+    switch (outputType) {
+    case BIGINT:
+      return value instanceof Long;
+    case DECIMAL:
+      return value instanceof BigDecimal;
+    case DOUBLE:
+      return value instanceof Double;
+    case FLOAT:
+      return value instanceof Float;
+    case INTEGER:
+      return value instanceof Integer;
+    case SMALLINT:
+      return value instanceof Short;
+    case TINYINT:
+      return value instanceof Byte;
+    case BOOLEAN:
+      return value instanceof Boolean;
+    case CHAR:
+    case VARCHAR:
+      return value instanceof String || value instanceof NlsString;
+    case TIME:
+      return value instanceof GregorianCalendar;
+    case TIMESTAMP:
+    case DATE:
+      return value instanceof Date;
+    case INTERVAL_HOUR:
+      return value instanceof BigDecimal;
+    case INTERVAL_MINUTE:
+      return value instanceof BigDecimal;
+    case SYMBOL:
+      // for SYMBOL, it supports anything...
+      return true;
+    default:
+      throw new UnsupportedOperationException(outputType.name());
+    }
+  }
+
+  @Override
+  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..efdb2df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for REINTERPRET.
+ *
+ * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
+ * to {@code BIGINT} is supported.
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+  public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    return getOperands().size() == 1
+        && outputType == SqlTypeName.BIGINT
+        && SqlTypeName.DATETIME_TYPES.contains(opType(0));
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    if (opType(0) == SqlTypeName.TIME) {
+      GregorianCalendar date = opValueEvaluated(0, inputRow);
+      return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
+
+    } else {
+      Date date = opValueEvaluated(0, inputRow);
+      return BeamSqlPrimitive.of(outputType, date.getTime());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
new file mode 100644
index 0000000..e389ef9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * invoke a UDF function.
+ */
+public class BeamSqlUdfExpression extends BeamSqlExpression {
+  //as Method is not Serializable, need to keep class/method information, and rebuild it.
+  private transient Method method;
+  private String className;
+  private String methodName;
+  private List<String> paraClassName = new ArrayList<>();
+
+  public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
+      SqlTypeName sqlTypeName) {
+    super(subExps, sqlTypeName);
+    this.method = method;
+
+    this.className = method.getDeclaringClass().getName();
+    this.methodName = method.getName();
+    for (Class<?> c : method.getParameterTypes()) {
+      paraClassName.add(c.getName());
+    }
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    if (method == null) {
+      reConstructMethod();
+    }
+    try {
+      List<Object> paras = new ArrayList<>();
+      for (BeamSqlExpression e : getOperands()) {
+        paras.add(e.evaluate(inputRow).getValue());
+      }
+
+      return BeamSqlPrimitive.of(getOutputType(),
+          method.invoke(null, paras.toArray(new Object[]{})));
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * re-construct method from class/method.
+   */
+  private void reConstructMethod() {
+    try {
+      List<Class<?>> paraClass = new ArrayList<>();
+      for (String pc : paraClassName) {
+        paraClass.add(Class.forName(pc));
+      }
+      method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
new file mode 100644
index 0000000..ecc6939
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
+ *
+ * <p>These operators returns the <em>end</em> timestamp of window.
+ */
+public class BeamSqlWindowEndExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRow.getWindowEnd().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
new file mode 100644
index 0000000..71f0672
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
+ *
+ * <p>These functions don't change the timestamp field, instead it's used to indicate
+ * the event_timestamp field, and how the window is defined.
+ */
+public class BeamSqlWindowExpression extends BeamSqlExpression {
+
+  public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override
+  public boolean accept() {
+    return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        (Date) operands.get(0).evaluate(inputRow).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
new file mode 100644
index 0000000..f3aba2e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
+ * {@code SESSION_START} operation.
+ *
+ * <p>These operators returns the <em>start</em> timestamp of window.
+ */
+public class BeamSqlWindowStartExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRow.getWindowStart().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
new file mode 100644
index 0000000..d62123c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all arithmetic operators.
+ */
+public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
+  private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
+  static {
+    ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+    super(operands, deduceOutputType(operands.get(0).getOutputType(),
+        operands.get(1).getOutputType()));
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+    BigDecimal left = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(0, inputRow).toString()));
+    BigDecimal right = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(1, inputRow).toString()));
+
+    BigDecimal result = calc(left, right);
+    return getCorrectlyTypedResult(result);
+  }
+
+  protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
+
+  protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
+    int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
+    int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
+    if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
+        && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
+      return SqlTypeName.DOUBLE;
+    }
+
+    if (leftIndex < rightIndex) {
+      return right;
+    } else if (leftIndex > rightIndex) {
+      return left;
+    } else {
+      return left;
+    }
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 2) {
+      return false;
+    }
+
+    for (BeamSqlExpression operand : operands) {
+      if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
+    Number actualValue;
+    switch (outputType) {
+      case TINYINT:
+        actualValue = rawResult.byteValue();
+        break;
+      case SMALLINT:
+        actualValue = rawResult.shortValue();
+        break;
+      case INTEGER:
+        actualValue = rawResult.intValue();
+        break;
+      case BIGINT:
+        actualValue = rawResult.longValue();
+        break;
+      case FLOAT:
+        actualValue = rawResult.floatValue();
+        break;
+      case DOUBLE:
+        actualValue = rawResult.doubleValue();
+        break;
+      case DECIMAL:
+      default:
+        actualValue = rawResult;
+    }
+    return BeamSqlPrimitive.of(outputType, actualValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
new file mode 100644
index 0000000..c5fe02b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '/' operator.
+ */
+public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlDivideExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.divide(right, 10, RoundingMode.HALF_EVEN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
new file mode 100644
index 0000000..fe08870
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '-' operator.
+ */
+public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlMinusExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.subtract(right);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
new file mode 100644
index 0000000..11ecf25
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '%' operator.
+ */
+public class BeamSqlModExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlModExpression(List<BeamSqlExpression> operands) {
+    super(operands, operands.get(1).getOutputType());
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return BigDecimal.valueOf(left.doubleValue() % right.doubleValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
new file mode 100644
index 0000000..e16d3cb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '*' operator.
+ */
+public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlMultiplyExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.multiply(right);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
new file mode 100644
index 0000000..5804279
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '+' operator.
+ */
+public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlPlusExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.add(right);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java
new file mode 100644
index 0000000..b8f2175
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Arithmetic operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
new file mode 100644
index 0000000..80f0853
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlCompareExpression} is used for compare operations.
+ *
+ * <p>See {@link BeamSqlEqualsExpression}, {@link BeamSqlLessThanExpression},
+ * {@link BeamSqlLessThanOrEqualsExpression}, {@link BeamSqlGreaterThanExpression},
+ * {@link BeamSqlGreaterThanOrEqualsExpression} and {@link BeamSqlNotEqualsExpression}
+ * for more details.
+ *
+ */
+public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
+
+  private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * Compare operation must have 2 operands.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 2;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
+    switch (operands.get(0).getOutputType()) {
+    case BIGINT:
+    case DECIMAL:
+    case DOUBLE:
+    case FLOAT:
+    case INTEGER:
+    case SMALLINT:
+    case TINYINT:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Number) leftValue, (Number) rightValue));
+    case BOOLEAN:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((Boolean) leftValue, (Boolean) rightValue));
+    case VARCHAR:
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+          compare((CharSequence) leftValue, (CharSequence) rightValue));
+    default:
+      throw new UnsupportedOperationException(toString());
+    }
+  }
+
+  /**
+   * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
+   */
+  public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
+
+  /**
+   * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
+   */
+  public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
+
+  /**
+   * Compare between Number values, including {@link SqlTypeName#BIGINT},
+   * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
+   * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
+   */
+  public abstract Boolean compare(Number leftValue, Number rightValue);
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
new file mode 100644
index 0000000..40b015e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code =} operation.
+ */
+public class BeamSqlEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return !(leftValue ^ rightValue);
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() == (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
new file mode 100644
index 0000000..8bfa511
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >} operation.
+ */
+public class BeamSqlGreaterThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlGreaterThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("> is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() > (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
new file mode 100644
index 0000000..54faa35
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >=} operation.
+ */
+public class BeamSqlGreaterThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlGreaterThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException(">= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() >= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
new file mode 100644
index 0000000..6d93c5d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NOT NULL' operation.
+ */
+public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNotNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
new file mode 100644
index 0000000..4450f3a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NULL' operation.
+ */
+public class BeamSqlIsNullExpression extends BeamSqlExpression {
+
+  private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlIsNullExpression(BeamSqlExpression operand){
+    this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+  }
+
+  /**
+   * only one operand is required.
+   */
+  @Override
+  public boolean accept() {
+    return operands.size() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
new file mode 100644
index 0000000..7ae6dad
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <} operation.
+ */
+public class BeamSqlLessThanExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("< is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() < (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
new file mode 100644
index 0000000..4a2cef2
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <=} operation.
+ */
+public class BeamSqlLessThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlLessThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    throw new IllegalArgumentException("<= is not supported for Boolean.");
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() <= (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
new file mode 100644
index 0000000..e02df3d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <>} operation.
+ */
+public class BeamSqlNotEqualsExpression extends BeamSqlCompareExpression {
+
+  public BeamSqlNotEqualsExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+    return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0;
+  }
+
+  @Override
+  public Boolean compare(Boolean leftValue, Boolean rightValue) {
+    return leftValue ^ rightValue;
+  }
+
+  @Override
+  public Boolean compare(Number leftValue, Number rightValue) {
+    return (leftValue == null && rightValue == null)
+        || (leftValue != null && rightValue != null
+              && leftValue.floatValue() != (rightValue).floatValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java
new file mode 100644
index 0000000..eea18ff
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Comparison operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.comparison;

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
new file mode 100644
index 0000000..c7df5ab
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Collections;
+import java.util.Date;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CURRENT_DATE and LOCALTIME.
+ *
+ * <p>Returns the current date in the session time zone, in a value of datatype DATE.
+ */
+public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
+  public BeamSqlCurrentDateExpression() {
+    super(Collections.<BeamSqlExpression>emptyList(), SqlTypeName.DATE);
+  }
+  @Override public boolean accept() {
+    return getOperands().size() == 0;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
new file mode 100644
index 0000000..46e5a43
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIME and CURRENT_TIME.
+ *
+ * <p>Returns the current date and time in the session time zone in a value of datatype TIME, with
+ * precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
+  public BeamSqlCurrentTimeExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIME);
+  }
+  @Override public boolean accept() {
+    int opCount = getOperands().size();
+    return opCount <= 1;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
+    ret.setTime(new Date());
+    return BeamSqlPrimitive.of(outputType, ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
new file mode 100644
index 0000000..303846d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIMESTAMP and CURRENT_TIMESTAMP.
+ *
+ * <p>Returns the current date and time in the session time zone in a value of datatype TIMESTAMP,
+ * with precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
+  public BeamSqlCurrentTimestampExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIMESTAMP);
+  }
+  @Override public boolean accept() {
+    int opCount = getOperands().size();
+    return opCount <= 1;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
new file mode 100644
index 0000000..59e3e9c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CEIL(date).
+ *
+ * <p>NOTE: only support CEIL for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateCeilExpression extends BeamSqlExpression {
+  public BeamSqlDateCeilExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIMESTAMP);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.SYMBOL;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
+    long time = date.getTime();
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+    long newTime = DateTimeUtils.unixTimestampCeil(unit, time);
+    Date newDate = new Date(newTime);
+
+    return BeamSqlPrimitive.of(outputType, newDate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
new file mode 100644
index 0000000..64234f5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for FLOOR(date).
+ *
+ * <p>NOTE: only support FLOOR for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateFloorExpression extends BeamSqlExpression {
+  public BeamSqlDateFloorExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DATE);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.SYMBOL;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Date date = opValueEvaluated(0, inputRow);
+    long time = date.getTime();
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+    long newTime = DateTimeUtils.unixTimestampFloor(unit, time);
+    Date newDate = new Date(newTime);
+
+    return BeamSqlPrimitive.of(outputType, newDate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
new file mode 100644
index 0000000..d41a249
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for EXTRACT.
+ *
+ * <p>The following date functions also implicitly converted to {@code EXTRACT}:
+ * <ul>
+ *   <li>YEAR(date) =&gt; EXTRACT(YEAR FROM date)</li>
+ *   <li>MONTH(date) =&gt; EXTRACT(MONTH FROM date)</li>
+ *   <li>DAY(date) =&gt; EXTRACT(DAY FROM date)</li>
+ *   <li>QUARTER(date) =&gt; EXTRACT(QUARTER FROM date)</li>
+ *   <li>WEEK(date) =&gt; EXTRACT(WEEK FROM date)</li>
+ *   <li>DAYOFYEAR(date) =&gt; EXTRACT(DOY FROM date)</li>
+ *   <li>DAYOFMONTH(date) =&gt; EXTRACT(DAY FROM date)</li>
+ *   <li>DAYOFWEEK(date) =&gt; EXTRACT(DOW FROM date)</li>
+ * </ul>
+ */
+public class BeamSqlExtractExpression extends BeamSqlExpression {
+  private static final Map<TimeUnitRange, Integer> typeMapping = new HashMap<>();
+  static {
+    typeMapping.put(TimeUnitRange.DOW, Calendar.DAY_OF_WEEK);
+    typeMapping.put(TimeUnitRange.DOY, Calendar.DAY_OF_YEAR);
+    typeMapping.put(TimeUnitRange.WEEK, Calendar.WEEK_OF_YEAR);
+  }
+
+  public BeamSqlExtractExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.BIGINT);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.BIGINT;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Long time = opValueEvaluated(1, inputRow);
+
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
+
+    switch (unit) {
+      case YEAR:
+      case MONTH:
+      case DAY:
+        Long timeByDay = time / 1000 / 3600 / 24;
+        Long extracted = DateTimeUtils.unixDateExtract(
+            unit,
+            timeByDay
+        );
+        return BeamSqlPrimitive.of(outputType, extracted);
+
+      case DOY:
+      case DOW:
+      case WEEK:
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date(time));
+        return BeamSqlPrimitive.of(outputType, (long) calendar.get(typeMapping.get(unit)));
+
+      case QUARTER:
+        calendar = Calendar.getInstance();
+        calendar.setTime(new Date(time));
+        long ret = calendar.get(Calendar.MONTH) / 3;
+        if (ret * 3 < calendar.get(Calendar.MONTH)) {
+          ret += 1;
+        }
+        return BeamSqlPrimitive.of(outputType, ret);
+
+      default:
+        throw new UnsupportedOperationException(
+            "Extract for time unit: " + unit + " not supported!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java
new file mode 100644
index 0000000..d3cc98f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * date functions.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.date;

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
new file mode 100644
index 0000000..5f6abe0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'AND' operation.
+ */
+public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
+  public BeamSqlAndExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    boolean result = true;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+      result = result && expOut.getValue();
+      if (!result) {
+        break;
+      }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}