You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:19 UTC
[56/59] beam git commit: move all implementation classes/packages
into impl package
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlCotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlCotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlCotExpression.java
new file mode 100644
index 0000000..68d56b5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlCotExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'COT' function.
+ */
+public class BeamSqlCotExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlCotExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.cot(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlDegreesExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlDegreesExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlDegreesExpression.java
new file mode 100644
index 0000000..de4eac2
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlDegreesExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'DEGREES' function.
+ */
+public class BeamSqlDegreesExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlDegreesExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.degrees(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlExpExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlExpExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlExpExpression.java
new file mode 100644
index 0000000..a789355
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlExpExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'EXP' function.
+ */
+public class BeamSqlExpExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlExpExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.exp(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlFloorExpression.java
new file mode 100644
index 0000000..def50f9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlFloorExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'FLOOR' function.
+ */
+public class BeamSqlFloorExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlFloorExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ switch (getOutputType()) {
+ case DECIMAL:
+ return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, SqlFunctions.floor(op.getDecimal()));
+ default:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.floor(SqlFunctions.toDouble(op.getValue())));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLnExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLnExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLnExpression.java
new file mode 100644
index 0000000..ea46044
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLnExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'LN' function.
+ */
+public class BeamSqlLnExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlLnExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.ln(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLogExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLogExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLogExpression.java
new file mode 100644
index 0000000..9a99b61
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlLogExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'Log10' function.
+ */
+public class BeamSqlLogExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlLogExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.log10(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
new file mode 100644
index 0000000..c12b725
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all binary functions such as
+ * POWER, MOD, RAND_INTEGER, ATAN2, ROUND, TRUNCATE.
+ */
+public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
+
+ public BeamSqlMathBinaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public boolean accept() {
+ return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
+ }
+
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+ BeamSqlExpression leftOp = op(0);
+ BeamSqlExpression rightOp = op(1);
+ return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
+ }
+
+ /**
+ * The base method for implementation of math binary functions.
+ *
+ * @param leftOp {@link BeamSqlPrimitive}
+ * @param rightOp {@link BeamSqlPrimitive}
+ * @return {@link BeamSqlPrimitive}
+ */
+ public abstract BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+ BeamSqlPrimitive rightOp);
+
+ /**
+ * The method to check whether operands are numeric or not.
+ */
+ public boolean isOperandNumeric(SqlTypeName opType) {
+ return SqlTypeName.NUMERIC_TYPES.contains(opType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
new file mode 100644
index 0000000..163c40e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+
+/**
+ * Base class for all unary functions such as
+ * ABS, SQRT, LN, LOG10, EXP, CEIL, FLOOR, RAND, ACOS,
+ * ASIN, ATAN, COS, COT, DEGREES, RADIANS, SIGN, SIN, TAN.
+ */
+public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
+
+ public BeamSqlMathUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public boolean accept() {
+ boolean acceptance = false;
+
+ if (numberOfOperands() == 1 && SqlTypeName.NUMERIC_TYPES.contains(opType(0))) {
+ acceptance = true;
+ }
+ return acceptance;
+ }
+
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+ BeamSqlExpression operand = op(0);
+ return calculate(operand.evaluate(inputRow));
+ }
+
+ /**
+ * For the operands of other type {@link SqlTypeName#NUMERIC_TYPES}.
+ * */
+
+ public abstract BeamSqlPrimitive calculate(BeamSqlPrimitive op);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
new file mode 100644
index 0000000..dfaf546
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for the PI function.
+ */
+public class BeamSqlPiExpression extends BeamSqlExpression {
+
+ public BeamSqlPiExpression() {
+ this.outputType = SqlTypeName.DOUBLE;
+ }
+
+ @Override public boolean accept() {
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPowerExpression.java
new file mode 100644
index 0000000..cc58679
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPowerExpression.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathBinaryExpression} for 'POWER' function.
+ */
+public class BeamSqlPowerExpression extends BeamSqlMathBinaryExpression {
+
+ public BeamSqlPowerExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override
+ public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+ BeamSqlPrimitive rightOp) {
+ return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions
+ .power(SqlFunctions.toDouble(leftOp.getValue()),
+ SqlFunctions.toDouble(rightOp.getValue())));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRadiansExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRadiansExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRadiansExpression.java
new file mode 100644
index 0000000..74c633d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRadiansExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'RADIANS' function.
+ */
+public class BeamSqlRadiansExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlRadiansExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.radians(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
new file mode 100644
index 0000000..f2d7a47
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'RAND([seed])' function.
+ */
+public class BeamSqlRandExpression extends BeamSqlExpression {
+ private Random rand = new Random();
+ private Integer seed = null;
+
+ public BeamSqlRandExpression(List<BeamSqlExpression> subExps) {
+ super(subExps, SqlTypeName.DOUBLE);
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ if (operands.size() == 1) {
+ int rowSeed = opValueEvaluated(0, inputRecord);
+ if (seed == null || seed != rowSeed) {
+ rand.setSeed(rowSeed);
+ }
+ }
+ return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, rand.nextDouble());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
new file mode 100644
index 0000000..b2e65ce
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'RAND_INTEGER([seed, ] numeric)'
+ * function.
+ */
+public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
+ private Random rand = new Random();
+ private Integer seed = null;
+
+ public BeamSqlRandIntegerExpression(List<BeamSqlExpression> subExps) {
+ super(subExps, SqlTypeName.INTEGER);
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ int numericIdx = 0;
+ if (operands.size() == 2) {
+ int rowSeed = opValueEvaluated(0, inputRecord);
+ if (seed == null || seed != rowSeed) {
+ rand.setSeed(rowSeed);
+ }
+ numericIdx = 1;
+ }
+ return BeamSqlPrimitive.of(SqlTypeName.INTEGER,
+ rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRoundExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRoundExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRoundExpression.java
new file mode 100644
index 0000000..1725dbb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRoundExpression.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathBinaryExpression} for 'ROUND' function.
+ */
+public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression {
+
+ private final BeamSqlPrimitive zero = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0);
+
+ public BeamSqlRoundExpression(List<BeamSqlExpression> operands) {
+ super(operands, operands.get(0).getOutputType());
+ checkForSecondOperand(operands);
+ }
+
+ private void checkForSecondOperand(List<BeamSqlExpression> operands) {
+ if (numberOfOperands() == 1) {
+ operands.add(1, zero);
+ }
+ }
+
+ @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+ BeamSqlPrimitive rightOp) {
+ BeamSqlPrimitive result = null;
+ switch (leftOp.getOutputType()) {
+ case SMALLINT:
+ result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
+ (short) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
+ break;
+ case TINYINT:
+ result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
+ (byte) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
+ break;
+ case INTEGER:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.INTEGER, roundInt(leftOp.getInteger(), toInt(rightOp.getValue())));
+ break;
+ case BIGINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, roundLong(leftOp.getLong(), toInt(rightOp.getValue())));
+ break;
+ case DOUBLE:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, roundDouble(leftOp.getDouble(), toInt(rightOp.getValue())));
+ break;
+ case FLOAT:
+ result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
+ (float) roundDouble(leftOp.getFloat(), toInt(rightOp.getValue())));
+ break;
+ case DECIMAL:
+ result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+ roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue())));
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+ private int roundInt(int v1, int v2) {
+ return SqlFunctions.sround(v1, v2);
+ }
+
+ private double roundDouble(double v1, int v2) {
+ return SqlFunctions.sround(v1, v2);
+ }
+
+ private BigDecimal roundBigDecimal(BigDecimal v1, int v2) {
+ return SqlFunctions.sround(v1, v2);
+ }
+
+ private long roundLong(long v1, int v2) {
+ return SqlFunctions.sround(v1, v2);
+ }
+
+ private int toInt(Object value) {
+ return SqlFunctions.toInt(value);
+ }
+
+ private BigDecimal toBigDecimal(Object value) {
+ return SqlFunctions.toBigDecimal(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSignExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSignExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSignExpression.java
new file mode 100644
index 0000000..6be8102
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSignExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'SIGN' function.
+ */
+public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlSignExpression(List<BeamSqlExpression> operands) {
+ super(operands, operands.get(0).getOutputType());
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ BeamSqlPrimitive result = null;
+ switch (op.getOutputType()) {
+ case TINYINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.TINYINT, (byte) SqlFunctions.sign(SqlFunctions.toByte(op.getValue())));
+ break;
+ case SMALLINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.SMALLINT, (short) SqlFunctions.sign(SqlFunctions.toShort(op.getValue())));
+ break;
+ case INTEGER:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.INTEGER, SqlFunctions.sign(SqlFunctions.toInt(op.getValue())));
+ break;
+ case BIGINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, SqlFunctions.sign(SqlFunctions.toLong(op.getValue())));
+ break;
+ case FLOAT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.FLOAT, (float) SqlFunctions.sign(SqlFunctions.toFloat(op.getValue())));
+ break;
+ case DOUBLE:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.sign(SqlFunctions.toDouble(op.getValue())));
+ break;
+ case DECIMAL:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue())));
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSinExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSinExpression.java
new file mode 100644
index 0000000..25dc119
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlSinExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'SIN' function.
+ */
+public class BeamSqlSinExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlSinExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.sin(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTanExpression.java
new file mode 100644
index 0000000..4edd570
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTanExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'TAN' function.
+ */
+public class BeamSqlTanExpression extends BeamSqlMathUnaryExpression {
+
+ public BeamSqlTanExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.DOUBLE);
+ }
+
+ @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.tan(SqlFunctions.toDouble(op.getValue())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTruncateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTruncateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTruncateExpression.java
new file mode 100644
index 0000000..1060a63
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlTruncateExpression.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathBinaryExpression} for 'TRUNCATE' function.
+ */
+public class BeamSqlTruncateExpression extends BeamSqlMathBinaryExpression {
+
+ public BeamSqlTruncateExpression(List<BeamSqlExpression> operands) {
+ super(operands, operands.get(0).getOutputType());
+ }
+
+ @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+ BeamSqlPrimitive rightOp) {
+ BeamSqlPrimitive result = null;
+ int rightIntOperand = SqlFunctions.toInt(rightOp.getValue());
+ switch (leftOp.getOutputType()) {
+ case SMALLINT:
+ result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
+ (short) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
+ break;
+ case TINYINT:
+ result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
+ (byte) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
+ break;
+ case INTEGER:
+ result = BeamSqlPrimitive.of(SqlTypeName.INTEGER,
+ SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
+ break;
+ case BIGINT:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, SqlFunctions.struncate(leftOp.getLong(), rightIntOperand));
+ break;
+ case FLOAT:
+ result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
+ (float) SqlFunctions.struncate(SqlFunctions.toFloat(leftOp.getValue()),
+ rightIntOperand));
+ break;
+ case DOUBLE:
+ result = BeamSqlPrimitive.of(SqlTypeName.DOUBLE,
+ SqlFunctions.struncate(SqlFunctions.toDouble(leftOp.getValue()), rightIntOperand));
+ break;
+ case DECIMAL:
+ result = BeamSqlPrimitive
+ .of(SqlTypeName.DECIMAL, SqlFunctions.struncate(leftOp.getDecimal(), rightIntOperand));
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/package-info.java
new file mode 100644
index 0000000..740e1b5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * MATH functions/operators.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/package-info.java
new file mode 100644
index 0000000..c420361
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
new file mode 100644
index 0000000..580d747
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'CHAR_LENGTH' operator.
+ */
+public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
+ public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.INTEGER);
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
new file mode 100644
index 0000000..772ad41
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * String concat operator.
+ */
+public class BeamSqlConcatExpression extends BeamSqlExpression {
+
+ protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() != 2) {
+ return false;
+ }
+
+ for (BeamSqlExpression exp : getOperands()) {
+ if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String left = opValueEvaluated(0, inputRow);
+ String right = opValueEvaluated(1, inputRow);
+
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+ new StringBuilder(left.length() + right.length())
+ .append(left).append(right).toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
new file mode 100644
index 0000000..dc893e7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'INITCAP' operator.
+ */
+public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
+ public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+
+ StringBuilder ret = new StringBuilder(str);
+ boolean isInit = true;
+ for (int i = 0; i < str.length(); i++) {
+ if (Character.isWhitespace(str.charAt(i))) {
+ isInit = true;
+ continue;
+ }
+
+ if (isInit) {
+ ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
+ isInit = false;
+ } else {
+ ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
+ }
+ }
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
new file mode 100644
index 0000000..fd9d7aa
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'LOWER' operator.
+ */
+public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
+ public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
new file mode 100644
index 0000000..8d38efb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'OVERLAY' operator.
+ *
+ * <p>
+ * OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
+ * </p>
+ */
+public class BeamSqlOverlayExpression extends BeamSqlExpression {
+ public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() < 3 || operands.size() > 4) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+ || !SqlTypeName.CHAR_TYPES.contains(opType(1))
+ || !SqlTypeName.INT_TYPES.contains(opType(2))) {
+ return false;
+ }
+
+ if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ String replaceStr = opValueEvaluated(1, inputRow);
+ int idx = opValueEvaluated(2, inputRow);
+ // the index is 1 based.
+ idx -= 1;
+ int length = replaceStr.length();
+ if (operands.size() == 4) {
+ length = opValueEvaluated(3, inputRow);
+ }
+
+ StringBuilder result = new StringBuilder(
+ str.length() + replaceStr.length() - length);
+ result.append(str.substring(0, idx))
+ .append(replaceStr)
+ .append(str.substring(idx + length));
+
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
new file mode 100644
index 0000000..ea5f749
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * String position operator.
+ *
+ * <p>
+ * example:
+ * POSITION(string1 IN string2)
+ * POSITION(string1 IN string2 FROM integer)
+ * </p>
+ */
+public class BeamSqlPositionExpression extends BeamSqlExpression {
+ public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.INTEGER);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() < 2 || operands.size() > 3) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+ || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
+ return false;
+ }
+
+ if (operands.size() == 3
+ && !SqlTypeName.INT_TYPES.contains(opType(2))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String targetStr = opValueEvaluated(0, inputRow);
+ String containingStr = opValueEvaluated(1, inputRow);
+ int from = -1;
+ if (operands.size() == 3) {
+ Number tmp = opValueEvaluated(2, inputRow);
+ from = tmp.intValue();
+ }
+
+ int idx = containingStr.indexOf(targetStr, from);
+
+ return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
new file mode 100644
index 0000000..1e1b553
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlStringUnaryExpression.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all string unary operators.
+ */
+public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
+ public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() != 1) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
+ return false;
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
new file mode 100644
index 0000000..25f205a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'SUBSTRING' operator.
+ *
+ * <p>
+ * SUBSTRING(string FROM integer)
+ * SUBSTRING(string FROM integer FOR integer)
+ * </p>
+ */
+public class BeamSqlSubstringExpression extends BeamSqlExpression {
+ public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() < 2 || operands.size() > 3) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+ || !SqlTypeName.INT_TYPES.contains(opType(1))) {
+ return false;
+ }
+
+ if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ int idx = opValueEvaluated(1, inputRow);
+ int startIdx = idx;
+ if (startIdx > 0) {
+ // NOTE: SQL substring is 1 based(rather than 0 based)
+ startIdx -= 1;
+ } else if (startIdx < 0) {
+ // NOTE: SQL also support negative index...
+ startIdx += str.length();
+ } else {
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
+ }
+
+ if (operands.size() == 3) {
+ int length = opValueEvaluated(2, inputRow);
+ if (length < 0) {
+ length = 0;
+ }
+ int endIdx = Math.min(startIdx + length, str.length());
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx));
+ } else {
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
new file mode 100644
index 0000000..9493e24
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Trim operator.
+ *
+ * <p>
+ * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
+ * </p>
+ */
+public class BeamSqlTrimExpression extends BeamSqlExpression {
+ public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() != 1 && operands.size() != 3) {
+ return false;
+ }
+
+ if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
+ return false;
+ }
+
+ if (operands.size() == 3
+ && (
+ SqlTypeName.SYMBOL != opType(0)
+ || !SqlTypeName.CHAR_TYPES.contains(opType(1))
+ || !SqlTypeName.CHAR_TYPES.contains(opType(2)))
+ ) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ if (operands.size() == 1) {
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+ opValueEvaluated(0, inputRow).toString().trim());
+ } else {
+ SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
+ String targetStr = opValueEvaluated(1, inputRow);
+ String containingStr = opValueEvaluated(2, inputRow);
+
+ switch (type) {
+ case LEADING:
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr));
+ case TRAILING:
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr));
+ case BOTH:
+ default:
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+ trailingTrim(leadingTrim(containingStr, targetStr), targetStr));
+ }
+ }
+ }
+
+ static String leadingTrim(String containingStr, String targetStr) {
+ int idx = 0;
+ while (containingStr.startsWith(targetStr, idx)) {
+ idx += targetStr.length();
+ }
+
+ return containingStr.substring(idx);
+ }
+
+ static String trailingTrim(String containingStr, String targetStr) {
+ int idx = containingStr.length() - targetStr.length();
+ while (containingStr.startsWith(targetStr, idx)) {
+ idx -= targetStr.length();
+ }
+
+ idx += targetStr.length();
+ return containingStr.substring(0, idx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
new file mode 100644
index 0000000..9769c0e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'UPPER' operator.
+ */
+public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
+ public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
new file mode 100644
index 0000000..f8fc4be
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * String operators.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/package-info.java
new file mode 100644
index 0000000..3e58a08
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * interpreter generate runnable 'code' to execute SQL relational expressions.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter;