You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:58 UTC
[35/59] beam git commit: rename package org.apache.beam.dsls.sql to
org.apache.beam.sdk.extensions.sql
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
new file mode 100644
index 0000000..fc137da
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '%' operator.
+ */
+public class BeamSqlModExpression extends BeamSqlArithmeticExpression {
+ public BeamSqlModExpression(List<BeamSqlExpression> operands) {
+ super(operands, operands.get(1).getOutputType());
+ }
+
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return BigDecimal.valueOf(left.doubleValue() % right.doubleValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
new file mode 100644
index 0000000..7ea974c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '*' operator.
+ */
+public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression {
+ public BeamSqlMultiplyExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.multiply(right);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
new file mode 100644
index 0000000..3ce806f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '+' operator.
+ */
+public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression {
+ public BeamSqlPlusExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.add(right);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
new file mode 100644
index 0000000..5f8d649
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Arithmetic operators.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
new file mode 100644
index 0000000..9b6b527
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlCompareExpression} is used for compare operations.
+ *
+ * <p>See {@link BeamSqlEqualsExpression}, {@link BeamSqlLessThanExpression},
+ * {@link BeamSqlLessThanOrEqualsExpression}, {@link BeamSqlGreaterThanExpression},
+ * {@link BeamSqlGreaterThanOrEqualsExpression} and {@link BeamSqlNotEqualsExpression}
+ * for more details.
+ *
+ */
+public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
+
+ private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
+ this(operands, SqlTypeName.BOOLEAN);
+ }
+
+ /**
+ * Compare operation must have 2 operands.
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 2;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+ Object rightValue = operands.get(1).evaluate(inputRow).getValue();
+ switch (operands.get(0).getOutputType()) {
+ case BIGINT:
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+ compare((Number) leftValue, (Number) rightValue));
+ case BOOLEAN:
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+ compare((Boolean) leftValue, (Boolean) rightValue));
+ case VARCHAR:
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+ compare((CharSequence) leftValue, (CharSequence) rightValue));
+ default:
+ throw new UnsupportedOperationException(toString());
+ }
+ }
+
+ /**
+ * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
+ */
+ public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
+
+ /**
+ * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
+ */
+ public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
+
+ /**
+ * Compare between Number values, including {@link SqlTypeName#BIGINT},
+ * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
+ * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
+ */
+ public abstract Boolean compare(Number leftValue, Number rightValue);
+
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
new file mode 100644
index 0000000..b9767e3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code =} operation.
+ */
+public class BeamSqlEqualsExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlEqualsExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ return !(leftValue ^ rightValue);
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() == (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
new file mode 100644
index 0000000..5fdf27b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >} operation.
+ */
+public class BeamSqlGreaterThanExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlGreaterThanExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new IllegalArgumentException("> is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() > (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
new file mode 100644
index 0000000..ae22054
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code >=} operation.
+ */
+public class BeamSqlGreaterThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlGreaterThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new IllegalArgumentException(">= is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() >= (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
new file mode 100644
index 0000000..78660cb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NOT NULL' operation.
+ */
+public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
+
+ private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ public BeamSqlIsNotNullExpression(BeamSqlExpression operand){
+ this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+ }
+
+ /**
+ * only one operand is required.
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
new file mode 100644
index 0000000..013d8d7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NULL' operation.
+ */
+public class BeamSqlIsNullExpression extends BeamSqlExpression {
+
+ private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ public BeamSqlIsNullExpression(BeamSqlExpression operand){
+ this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+ }
+
+ /**
+ * only one operand is required.
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
new file mode 100644
index 0000000..a6e5cd9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <} operation.
+ */
+public class BeamSqlLessThanExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new IllegalArgumentException("< is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() < (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
new file mode 100644
index 0000000..52a604c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <=} operation.
+ */
+public class BeamSqlLessThanOrEqualsExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlLessThanOrEqualsExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new IllegalArgumentException("<= is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() <= (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
new file mode 100644
index 0000000..1c5b072
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * {@code BeamSqlExpression} for {@code <>} operation.
+ */
+public class BeamSqlNotEqualsExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlNotEqualsExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ return leftValue ^ rightValue;
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() != (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java
new file mode 100644
index 0000000..94ed727
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/comparison/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Comparison operators.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
new file mode 100644
index 0000000..e3d6cc8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Collections;
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CURRENT_DATE and LOCALTIME.
+ *
+ * <p>Returns the current date in the session time zone, in a value of datatype DATE.
+ */
+public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
+ public BeamSqlCurrentDateExpression() {
+ super(Collections.<BeamSqlExpression>emptyList(), SqlTypeName.DATE);
+ }
+ @Override public boolean accept() {
+ return getOperands().size() == 0;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(outputType, new Date());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
new file mode 100644
index 0000000..edabe53
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.TimeZone;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIME and CURRENT_TIME.
+ *
+ * <p>Returns the current date and time in the session time zone in a value of datatype TIME, with
+ * precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
+ public BeamSqlCurrentTimeExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.TIME);
+ }
+ @Override public boolean accept() {
+ int opCount = getOperands().size();
+ return opCount <= 1;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
+ ret.setTime(new Date());
+ return BeamSqlPrimitive.of(outputType, ret);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
new file mode 100644
index 0000000..73174b3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIMESTAMP and CURRENT_TIMESTAMP.
+ *
+ * <p>Returns the current date and time in the session time zone in a value of datatype TIMESTAMP,
+ * with precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
+ public BeamSqlCurrentTimestampExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.TIMESTAMP);
+ }
+ @Override public boolean accept() {
+ int opCount = getOperands().size();
+ return opCount <= 1;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(outputType, new Date());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
new file mode 100644
index 0000000..e575d6e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CEIL(date).
+ *
+ * <p>NOTE: only support CEIL for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateCeilExpression extends BeamSqlExpression {
+ public BeamSqlDateCeilExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.TIMESTAMP);
+ }
+ @Override public boolean accept() {
+ return operands.size() == 2
+ && opType(1) == SqlTypeName.SYMBOL;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Date date = opValueEvaluated(0, inputRow);
+ long time = date.getTime();
+ TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+ long newTime = DateTimeUtils.unixTimestampCeil(unit, time);
+ Date newDate = new Date(newTime);
+
+ return BeamSqlPrimitive.of(outputType, newDate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
new file mode 100644
index 0000000..4bad353
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for FLOOR(date).
+ *
+ * <p>NOTE: only support FLOOR for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateFloorExpression extends BeamSqlExpression {
+ public BeamSqlDateFloorExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DATE);
+ }
+ @Override public boolean accept() {
+ return operands.size() == 2
+ && opType(1) == SqlTypeName.SYMBOL;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Date date = opValueEvaluated(0, inputRow);
+ long time = date.getTime();
+ TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+ long newTime = DateTimeUtils.unixTimestampFloor(unit, time);
+ Date newDate = new Date(newTime);
+
+ return BeamSqlPrimitive.of(outputType, newDate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java
new file mode 100644
index 0000000..a7f3071
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for EXTRACT.
+ *
+ * <p>The following date functions also implicitly converted to {@code EXTRACT}:
+ * <ul>
+ * <li>YEAR(date) => EXTRACT(YEAR FROM date)</li>
+ * <li>MONTH(date) => EXTRACT(MONTH FROM date)</li>
+ * <li>DAY(date) => EXTRACT(DAY FROM date)</li>
+ * <li>QUARTER(date) => EXTRACT(QUARTER FROM date)</li>
+ * <li>WEEK(date) => EXTRACT(WEEK FROM date)</li>
+ * <li>DAYOFYEAR(date) => EXTRACT(DOY FROM date)</li>
+ * <li>DAYOFMONTH(date) => EXTRACT(DAY FROM date)</li>
+ * <li>DAYOFWEEK(date) => EXTRACT(DOW FROM date)</li>
+ * </ul>
+ */
+public class BeamSqlExtractExpression extends BeamSqlExpression {
+ private static final Map<TimeUnitRange, Integer> typeMapping = new HashMap<>();
+ static {
+ typeMapping.put(TimeUnitRange.DOW, Calendar.DAY_OF_WEEK);
+ typeMapping.put(TimeUnitRange.DOY, Calendar.DAY_OF_YEAR);
+ typeMapping.put(TimeUnitRange.WEEK, Calendar.WEEK_OF_YEAR);
+ }
+
+ public BeamSqlExtractExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.BIGINT);
+ }
+ @Override public boolean accept() {
+ return operands.size() == 2
+ && opType(1) == SqlTypeName.BIGINT;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Long time = opValueEvaluated(1, inputRow);
+
+ TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
+
+ switch (unit) {
+ case YEAR:
+ case MONTH:
+ case DAY:
+ Long timeByDay = time / 1000 / 3600 / 24;
+ Long extracted = DateTimeUtils.unixDateExtract(
+ unit,
+ timeByDay
+ );
+ return BeamSqlPrimitive.of(outputType, extracted);
+
+ case DOY:
+ case DOW:
+ case WEEK:
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date(time));
+ return BeamSqlPrimitive.of(outputType, (long) calendar.get(typeMapping.get(unit)));
+
+ case QUARTER:
+ calendar = Calendar.getInstance();
+ calendar.setTime(new Date(time));
+ long ret = calendar.get(Calendar.MONTH) / 3;
+ if (ret * 3 < calendar.get(Calendar.MONTH)) {
+ ret += 1;
+ }
+ return BeamSqlPrimitive.of(outputType, ret);
+
+ default:
+ throw new UnsupportedOperationException(
+ "Extract for time unit: " + unit + " not supported!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java
new file mode 100644
index 0000000..1ccd9d6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/date/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * date functions.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.date;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java
new file mode 100644
index 0000000..eca945b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'AND' operation.
+ */
+public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
+ public BeamSqlAndExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ boolean result = true;
+ for (BeamSqlExpression exp : operands) {
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+ result = result && expOut.getValue();
+ if (!result) {
+ break;
+ }
+ }
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
new file mode 100644
index 0000000..3d2e050
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for Logical operators.
+ */
+public abstract class BeamSqlLogicalExpression extends BeamSqlExpression {
+ private BeamSqlLogicalExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+ public BeamSqlLogicalExpression(List<BeamSqlExpression> operands) {
+ this(operands, SqlTypeName.BOOLEAN);
+ }
+
+ @Override
+ public boolean accept() {
+ for (BeamSqlExpression exp : operands) {
+ // only accept BOOLEAN expression as operand
+ if (!exp.getOutputType().equals(SqlTypeName.BOOLEAN)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java
new file mode 100644
index 0000000..521b340
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for logical operator: NOT.
+ *
+ * <p>Whether boolean is not TRUE; returns UNKNOWN if boolean is UNKNOWN.
+ */
+public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
+ public BeamSqlNotExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public boolean accept() {
+ if (numberOfOperands() != 1) {
+ return false;
+ }
+ return super.accept();
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ Boolean value = opValueEvaluated(0, inputRow);
+ if (value == null) {
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
+ } else {
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, !value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java
new file mode 100644
index 0000000..a9d8e8a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'OR' operation.
+ */
+public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
+ public BeamSqlOrExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+ boolean result = false;
+ for (BeamSqlExpression exp : operands) {
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+ result = result || expOut.getValue();
+ if (result) {
+ break;
+ }
+ }
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java
new file mode 100644
index 0000000..b7ef1ba
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/logical/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Logical operators.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.logical;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java
new file mode 100644
index 0000000..0a68563
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAbsExpression.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ABS' function.
+ */
+public class BeamSqlAbsExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlAbsExpression(List<BeamSqlExpression> operands) {
+ super(operands, operands.get(0).getOutputType());
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ BeamSqlPrimitive result = null;
+ switch (op.getOutputType()) {
+ case INTEGER:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.INTEGER, SqlFunctions.abs(op.getInteger()));
+ break;
+ case BIGINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, SqlFunctions.abs(op.getLong()));
+ break;
+ case TINYINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.TINYINT, SqlFunctions.abs(op.getByte()));
+ break;
+ case SMALLINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.SMALLINT, SqlFunctions.abs(op.getShort()));
+ break;
+ case FLOAT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.FLOAT, SqlFunctions.abs(op.getFloat()));
+ break;
+ case DECIMAL:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.DECIMAL, SqlFunctions.abs(new BigDecimal(op.getValue().toString())));
+ break;
+ case DOUBLE:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.abs(op.getDouble()));
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java
new file mode 100644
index 0000000..a49d72a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAcosExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ACOS' function.
+ */
+public class BeamSqlAcosExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlAcosExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.acos(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java
new file mode 100644
index 0000000..557ec8d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAsinExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ASIN' function.
+ */
+public class BeamSqlAsinExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlAsinExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.asin(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
new file mode 100644
index 0000000..4e11b42
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlMathBinaryExpression} for 'ATAN2' function.
+ */
+public class BeamSqlAtan2Expression extends BeamSqlMathBinaryExpression {
+
+ public BeamSqlAtan2Expression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+ BeamSqlPrimitive rightOp) {
+ return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions
+ .atan2(SqlFunctions.toDouble(leftOp.getValue()),
+ SqlFunctions.toDouble(rightOp.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java
new file mode 100644
index 0000000..0991252
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlAtanExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ATAN' function.
+ */
+public class BeamSqlAtanExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlAtanExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.atan(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java
new file mode 100644
index 0000000..a3cb9c8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCeilExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'CEIL' function.
+ */
+public class BeamSqlCeilExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlCeilExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ switch (getOutputType()) {
+ case DECIMAL:
+ return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, SqlFunctions.ceil(op.getDecimal()));
+ default:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.ceil(SqlFunctions.toDouble(op.getValue())));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java
new file mode 100644
index 0000000..6ddd079
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCosExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'COS' function.
+ */
+public class BeamSqlCosExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlCosExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.cos(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java
new file mode 100644
index 0000000..9dfbd90
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlCotExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'COT' function.
+ */
+public class BeamSqlCotExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlCotExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.cot(SqlFunctions.toDouble(op.getValue())));
+ }
+}