You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:32 UTC

[09/59] beam git commit: move dsls/sql to sdks/java/extensions/sql

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
new file mode 100644
index 0000000..c9ff186
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for Logical operators.
+ */
+public abstract class BeamSqlLogicalExpression extends BeamSqlExpression {
+  private BeamSqlLogicalExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+  public BeamSqlLogicalExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  @Override
+  public boolean accept() {
+    for (BeamSqlExpression exp : operands) {
+      // only accept BOOLEAN expression as operand
+      if (!exp.getOutputType().equals(SqlTypeName.BOOLEAN)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
new file mode 100644
index 0000000..6df52aa
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for logical operator: NOT.
+ *
+ * <p>Whether boolean is not TRUE; returns UNKNOWN if boolean is UNKNOWN.
+ */
+public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
+  public BeamSqlNotExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public boolean accept() {
+    if (numberOfOperands() != 1) {
+      return false;
+    }
+    return super.accept();
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    Boolean value = opValueEvaluated(0, inputRow);
+    if (value == null) {
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
+    } else {
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, !value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
new file mode 100644
index 0000000..450638c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'OR' operation.
+ */
+public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
+  public BeamSqlOrExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) {
+    boolean result = false;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+        result = result || expOut.getValue();
+        if (result) {
+          break;
+        }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java
new file mode 100644
index 0000000..7862045
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Logical operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.logical;

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
new file mode 100644
index 0000000..e563634
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ABS' function.
+ */
+public class BeamSqlAbsExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAbsExpression(List<BeamSqlExpression> operands) {
+    super(operands, operands.get(0).getOutputType());
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    BeamSqlPrimitive result = null;
+    switch (op.getOutputType()) {
+      case INTEGER:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, SqlFunctions.abs(op.getInteger()));
+        break;
+      case BIGINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.abs(op.getLong()));
+        break;
+      case TINYINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.TINYINT, SqlFunctions.abs(op.getByte()));
+        break;
+      case SMALLINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.SMALLINT, SqlFunctions.abs(op.getShort()));
+        break;
+      case FLOAT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.FLOAT, SqlFunctions.abs(op.getFloat()));
+        break;
+      case DECIMAL:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DECIMAL, SqlFunctions.abs(new BigDecimal(op.getValue().toString())));
+        break;
+      case DOUBLE:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.abs(op.getDouble()));
+        break;
+      default:
+        break;
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAcosExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAcosExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAcosExpression.java
new file mode 100644
index 0000000..14b2a27
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAcosExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ACOS' function.
+ */
+public class BeamSqlAcosExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAcosExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.acos(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAsinExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAsinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAsinExpression.java
new file mode 100644
index 0000000..ed515b5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAsinExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ASIN' function.
+ */
+public class BeamSqlAsinExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAsinExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.asin(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtan2Expression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
new file mode 100644
index 0000000..2254f99
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtan2Expression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlMathBinaryExpression} for 'ATAN2' function.
+ */
+public class BeamSqlAtan2Expression extends BeamSqlMathBinaryExpression {
+
+  public BeamSqlAtan2Expression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+      BeamSqlPrimitive rightOp) {
+    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions
+        .atan2(SqlFunctions.toDouble(leftOp.getValue()),
+            SqlFunctions.toDouble(rightOp.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtanExpression.java
new file mode 100644
index 0000000..3a14d54
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAtanExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'ATAN' function.
+ */
+public class BeamSqlAtanExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlAtanExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.atan(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCeilExpression.java
new file mode 100644
index 0000000..c32c4fe
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCeilExpression.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'CEIL' function.
+ */
+public class BeamSqlCeilExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlCeilExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    switch (getOutputType()) {
+      case DECIMAL:
+        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, SqlFunctions.ceil(op.getDecimal()));
+      default:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.ceil(SqlFunctions.toDouble(op.getValue())));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCosExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCosExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCosExpression.java
new file mode 100644
index 0000000..d7fdc5f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCosExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'COS' function.
+ */
+public class BeamSqlCosExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlCosExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.cos(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCotExpression.java
new file mode 100644
index 0000000..a62f756
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlCotExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'COT' function.
+ */
+public class BeamSqlCotExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlCotExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.cot(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlDegreesExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlDegreesExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlDegreesExpression.java
new file mode 100644
index 0000000..e440479
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlDegreesExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'DEGREES' function.
+ */
+public class BeamSqlDegreesExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlDegreesExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.degrees(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlExpExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlExpExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlExpExpression.java
new file mode 100644
index 0000000..d34726d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlExpExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'EXP' function.
+ */
+public class BeamSqlExpExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlExpExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.exp(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlFloorExpression.java
new file mode 100644
index 0000000..47d7441
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlFloorExpression.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'FLOOR' function.
+ */
+public class BeamSqlFloorExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlFloorExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    switch (getOutputType()) {
+      case DECIMAL:
+        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, SqlFunctions.floor(op.getDecimal()));
+      default:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.floor(SqlFunctions.toDouble(op.getValue())));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLnExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLnExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLnExpression.java
new file mode 100644
index 0000000..7cc18bf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLnExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'LN' function.
+ */
+public class BeamSqlLnExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlLnExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.ln(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLogExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLogExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLogExpression.java
new file mode 100644
index 0000000..7253a1e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlLogExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'Log10' function.
+ */
+public class BeamSqlLogExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlLogExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.log10(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
new file mode 100644
index 0000000..05250c0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all binary functions such as
+ * POWER, MOD, RAND_INTEGER, ATAN2, ROUND, TRUNCATE.
+ */
+public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
+
+  public BeamSqlMathBinaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+    BeamSqlExpression leftOp = op(0);
+    BeamSqlExpression rightOp = op(1);
+    return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
+  }
+
+  /**
+   * The base method for implementation of math binary functions.
+   *
+   * @param leftOp {@link BeamSqlPrimitive}
+   * @param rightOp {@link BeamSqlPrimitive}
+   * @return {@link BeamSqlPrimitive}
+   */
+  public abstract BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+      BeamSqlPrimitive rightOp);
+
+  /**
+   * The method to check whether operands are numeric or not.
+   */
+  public boolean isOperandNumeric(SqlTypeName opType) {
+    return SqlTypeName.NUMERIC_TYPES.contains(opType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
new file mode 100644
index 0000000..5429057
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+
+/**
+ * Base class for all unary functions such as
+ * ABS, SQRT, LN, LOG10, EXP, CEIL, FLOOR, RAND, ACOS,
+ * ASIN, ATAN, COS, COT, DEGREES, RADIANS, SIGN, SIN, TAN.
+ */
+public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
+
+  public BeamSqlMathUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    boolean acceptance = false;
+
+    if (numberOfOperands() == 1 && SqlTypeName.NUMERIC_TYPES.contains(opType(0))) {
+      acceptance = true;
+    }
+    return acceptance;
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+    BeamSqlExpression operand = op(0);
+    return calculate(operand.evaluate(inputRow));
+  }
+
+  /**
+   * For the operands of other type {@link SqlTypeName#NUMERIC_TYPES}.
+   * */
+
+  public abstract BeamSqlPrimitive calculate(BeamSqlPrimitive op);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
new file mode 100644
index 0000000..cf797dd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPiExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for the PI function.
+ */
+public class BeamSqlPiExpression extends BeamSqlExpression {
+
+  public BeamSqlPiExpression() {
+    this.outputType = SqlTypeName.DOUBLE;
+  }
+
+  @Override public boolean accept() {
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPowerExpression.java
new file mode 100644
index 0000000..b1a8820
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlPowerExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathBinaryExpression} for 'POWER' function.
+ */
+public class BeamSqlPowerExpression extends BeamSqlMathBinaryExpression {
+
+  public BeamSqlPowerExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override
+  public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+      BeamSqlPrimitive rightOp) {
+    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions
+        .power(SqlFunctions.toDouble(leftOp.getValue()),
+            SqlFunctions.toDouble(rightOp.getValue())));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRadiansExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
new file mode 100644
index 0000000..3a77634
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'RADIANS' function.
+ */
+public class BeamSqlRadiansExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlRadiansExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.radians(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java
new file mode 100644
index 0000000..944936b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'RAND([seed])' function.
+ */
+public class BeamSqlRandExpression extends BeamSqlExpression {
+  private Random rand = new Random();
+  private Integer seed = null;
+
+  public BeamSqlRandExpression(List<BeamSqlExpression> subExps) {
+    super(subExps, SqlTypeName.DOUBLE);
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    if (operands.size() == 1) {
+      int rowSeed = opValueEvaluated(0, inputRecord);
+      if (seed == null || seed != rowSeed) {
+        rand.setSeed(rowSeed);
+      }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, rand.nextDouble());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
new file mode 100644
index 0000000..02e464f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'RAND_INTEGER([seed, ] numeric)'
+ * function.
+ */
+public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
+  private Random rand = new Random();
+  private Integer seed = null;
+
+  public BeamSqlRandIntegerExpression(List<BeamSqlExpression> subExps) {
+    super(subExps, SqlTypeName.INTEGER);
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    int numericIdx = 0;
+    if (operands.size() == 2) {
+      int rowSeed = opValueEvaluated(0, inputRecord);
+      if (seed == null || seed != rowSeed) {
+        rand.setSeed(rowSeed);
+      }
+      numericIdx = 1;
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.INTEGER,
+        rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
new file mode 100644
index 0000000..a712c85
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathBinaryExpression} for 'ROUND' function.
+ */
+public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression {
+
+  private final BeamSqlPrimitive zero = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0);
+
+  public BeamSqlRoundExpression(List<BeamSqlExpression> operands) {
+    super(operands, operands.get(0).getOutputType());
+    checkForSecondOperand(operands);
+  }
+
+  private void checkForSecondOperand(List<BeamSqlExpression> operands) {
+    if (numberOfOperands() == 1) {
+      operands.add(1, zero);
+    }
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+      BeamSqlPrimitive rightOp) {
+    BeamSqlPrimitive result = null;
+    switch (leftOp.getOutputType()) {
+      case SMALLINT:
+        result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
+            (short) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
+        break;
+      case TINYINT:
+        result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
+            (byte) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
+        break;
+      case INTEGER:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, roundInt(leftOp.getInteger(), toInt(rightOp.getValue())));
+        break;
+      case BIGINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, roundLong(leftOp.getLong(), toInt(rightOp.getValue())));
+        break;
+      case DOUBLE:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, roundDouble(leftOp.getDouble(), toInt(rightOp.getValue())));
+        break;
+      case FLOAT:
+        result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
+            (float) roundDouble(leftOp.getFloat(), toInt(rightOp.getValue())));
+        break;
+      case DECIMAL:
+        result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+            roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue())));
+        break;
+      default:
+        break;
+    }
+    return result;
+  }
+
+  private int roundInt(int v1, int v2) {
+    return SqlFunctions.sround(v1, v2);
+  }
+
+  private double roundDouble(double v1, int v2) {
+    return SqlFunctions.sround(v1, v2);
+  }
+
+  private BigDecimal roundBigDecimal(BigDecimal v1, int v2) {
+    return SqlFunctions.sround(v1, v2);
+  }
+
+  private long roundLong(long v1, int v2) {
+    return SqlFunctions.sround(v1, v2);
+  }
+
+  private int toInt(Object value) {
+    return SqlFunctions.toInt(value);
+  }
+
+  private BigDecimal toBigDecimal(Object value) {
+    return SqlFunctions.toBigDecimal(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
new file mode 100644
index 0000000..3f2d9af
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'SIGN' function.
+ */
+public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlSignExpression(List<BeamSqlExpression> operands) {
+    super(operands, operands.get(0).getOutputType());
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    BeamSqlPrimitive result = null;
+    switch (op.getOutputType()) {
+      case TINYINT:
+        result = BeamSqlPrimitive
+          .of(SqlTypeName.TINYINT, (byte) SqlFunctions.sign(SqlFunctions.toByte(op.getValue())));
+        break;
+      case SMALLINT:
+        result = BeamSqlPrimitive
+          .of(SqlTypeName.SMALLINT, (short) SqlFunctions.sign(SqlFunctions.toShort(op.getValue())));
+        break;
+      case INTEGER:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, SqlFunctions.sign(SqlFunctions.toInt(op.getValue())));
+        break;
+      case BIGINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.sign(SqlFunctions.toLong(op.getValue())));
+        break;
+      case FLOAT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.FLOAT, (float) SqlFunctions.sign(SqlFunctions.toFloat(op.getValue())));
+        break;
+      case DOUBLE:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.sign(SqlFunctions.toDouble(op.getValue())));
+        break;
+      case DECIMAL:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue())));
+        break;
+      default:
+        break;
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSinExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSinExpression.java
new file mode 100644
index 0000000..3459cd3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSinExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'SIN' function.
+ */
+public class BeamSqlSinExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlSinExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.sin(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTanExpression.java
new file mode 100644
index 0000000..d874217
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTanExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathUnaryExpression} for 'TAN' function.
+ */
+public class BeamSqlTanExpression extends BeamSqlMathUnaryExpression {
+
+  public BeamSqlTanExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DOUBLE);
+  }
+
+  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
+    return BeamSqlPrimitive
+        .of(SqlTypeName.DOUBLE, SqlFunctions.tan(SqlFunctions.toDouble(op.getValue())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTruncateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
new file mode 100644
index 0000000..187f796
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlMathBinaryExpression} for 'TRUNCATE' function.
+ */
+public class BeamSqlTruncateExpression extends BeamSqlMathBinaryExpression {
+
+  public BeamSqlTruncateExpression(List<BeamSqlExpression> operands) {
+    super(operands, operands.get(0).getOutputType());
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
+      BeamSqlPrimitive rightOp) {
+    BeamSqlPrimitive result = null;
+    int rightIntOperand = SqlFunctions.toInt(rightOp.getValue());
+    switch (leftOp.getOutputType()) {
+      case SMALLINT:
+        result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
+            (short) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
+        break;
+      case TINYINT:
+        result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
+            (byte) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
+        break;
+      case INTEGER:
+        result = BeamSqlPrimitive.of(SqlTypeName.INTEGER,
+            SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
+        break;
+      case BIGINT:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.struncate(leftOp.getLong(), rightIntOperand));
+        break;
+      case FLOAT:
+        result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
+            (float) SqlFunctions.struncate(SqlFunctions.toFloat(leftOp.getValue()),
+                rightIntOperand));
+        break;
+      case DOUBLE:
+        result = BeamSqlPrimitive.of(SqlTypeName.DOUBLE,
+            SqlFunctions.struncate(SqlFunctions.toDouble(leftOp.getValue()), rightIntOperand));
+        break;
+      case DECIMAL:
+        result = BeamSqlPrimitive
+            .of(SqlTypeName.DECIMAL, SqlFunctions.struncate(leftOp.getDecimal(), rightIntOperand));
+        break;
+      default:
+        break;
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/package-info.java
new file mode 100644
index 0000000..a7a5d0e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * MATH functions/operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.math;

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
new file mode 100644
index 0000000..9b0a9a7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
new file mode 100644
index 0000000..7c61061
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'CHAR_LENGTH' operator.
+ */
+public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
+  public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.INTEGER);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
+    return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
new file mode 100644
index 0000000..93e1f71
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * String concat operator.
+ */
+public class BeamSqlConcatExpression extends BeamSqlExpression {
+
+  protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 2) {
+      return false;
+    }
+
+    for (BeamSqlExpression exp : getOperands()) {
+      if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String left = opValueEvaluated(0, inputRow);
+    String right = opValueEvaluated(1, inputRow);
+
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+        new StringBuilder(left.length() + right.length())
+            .append(left).append(right).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
new file mode 100644
index 0000000..7726e27
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'INITCAP' operator.
+ */
+public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
+  public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
+
+    StringBuilder ret = new StringBuilder(str);
+    boolean isInit = true;
+    for (int i = 0; i < str.length(); i++) {
+      if (Character.isWhitespace(str.charAt(i))) {
+        isInit = true;
+        continue;
+      }
+
+      if (isInit) {
+        ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
+        isInit = false;
+      } else {
+        ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
+      }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
new file mode 100644
index 0000000..cb198ec
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'LOWER' operator.
+ */
+public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
+  public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.VARCHAR);
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    String str = opValueEvaluated(0, inputRow);
+    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
+  }
+}