You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:26 UTC

[03/59] beam git commit: move dsls/sql to sdks/java/extensions/sql

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
new file mode 100644
index 0000000..a34f109
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.arithmetic;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Tests for {@code BeamSqlArithmeticExpression}.
+ */
+public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void testAccept_normal() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // byte, short
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+    assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+    // integer, long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+    // float, double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+    assertTrue(new BeamSqlPlusExpression(operands).accept());
+
+    // varchar
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "1"));
+    assertFalse(new BeamSqlPlusExpression(operands).accept());
+  }
+
+  @Test public void testAccept_exception() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // more than 2 operands
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.MAX_VALUE));
+    assertFalse(new BeamSqlPlusExpression(operands).accept());
+
+    // boolean
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TINYINT, Byte.valueOf("1")));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    assertFalse(new BeamSqlPlusExpression(operands).accept());
+  }
+
+  @Test public void testPlus() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => integer
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // float + long => float
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(Float.valueOf(1.1F + 1),
+        new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testMinus() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => long
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1F - 1L,
+        new BeamSqlMinusExpression(operands).evaluate(record).getValue().floatValue(), 0.1);
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1.1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testMultiply() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => integer
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(Float.valueOf(2.1F * 1L),
+        new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testDivide() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => integer
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // float + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1F / 1,
+        new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+
+    // double + long => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(2.1, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testMod() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // integer + integer => long
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals(1, new BeamSqlModExpression(operands).evaluate(record).getValue());
+
+    // integer + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+
+    // long + long => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
new file mode 100644
index 0000000..951fc8d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCurrentDateExpression.
+ */
+public class BeamSqlCurrentDateExpressionTest extends BeamSqlDateExpressionTestBase {
+  @Test
+  public void test() {
+    assertEquals(SqlTypeName.DATE,
+        new BeamSqlCurrentDateExpression().evaluate(record).getOutputType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
new file mode 100644
index 0000000..ddf0a22
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlLocalTimeExpression.
+ */
+public class BeamSqlCurrentTimeExpressionTest extends BeamSqlDateExpressionTestBase {
+  @Test
+  public void test() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    assertEquals(SqlTypeName.TIME,
+        new BeamSqlCurrentTimeExpression(operands).evaluate(record).getOutputType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
new file mode 100644
index 0000000..a1554f1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlLocalTimestampExpression.
+ */
+public class BeamSqlCurrentTimestampExpressionTest extends BeamSqlDateExpressionTestBase {
+  @Test
+  public void test() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    assertEquals(SqlTypeName.TIMESTAMP,
+        new BeamSqlCurrentTimestampExpression(operands).evaluate(record).getOutputType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
new file mode 100644
index 0000000..8fc2178
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlDateCeilExpression}.
+ */
+public class BeamSqlDateCeilExpressionTest extends BeamSqlDateExpressionTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE,
+        str2DateTime("2017-05-22 09:10:11")));
+    // YEAR
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+    assertEquals(str2DateTime("2018-01-01 00:00:00"),
+        new BeamSqlDateCeilExpression(operands).evaluate(record).getDate());
+
+    operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
+    assertEquals(str2DateTime("2017-06-01 00:00:00"),
+        new BeamSqlDateCeilExpression(operands).evaluate(record).getDate());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
new file mode 100644
index 0000000..bc906df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+
+/**
+ * Base class for all date related expression test.
+ */
+public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase {
+  protected long str2LongTime(String dateStr) {
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    try {
+      Date date = format.parse(dateStr);
+      return date.getTime();
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected Date str2DateTime(String dateStr) {
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    try {
+      format.setTimeZone(TimeZone.getTimeZone("GMT"));
+      Date date = format.parse(dateStr);
+      return date;
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
new file mode 100644
index 0000000..3207d34
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlDateFloorExpression}.
+ */
+public class BeamSqlDateFloorExpressionTest extends BeamSqlDateExpressionTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE,
+        str2DateTime("2017-05-22 09:10:11")));
+    // YEAR
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+    assertEquals(str2DateTime("2017-01-01 00:00:00"),
+        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+    // MONTH
+    operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
+    assertEquals(str2DateTime("2017-05-01 00:00:00"),
+        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java
new file mode 100644
index 0000000..88909a0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlExtractExpression}.
+ */
+public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    long time = str2LongTime("2017-05-22 16:17:18");
+
+    // YEAR
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(2017L,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // MONTH
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(5L,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // DAY
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DAY));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(22L,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // DAY_OF_WEEK
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOW));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(2L,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // DAY_OF_YEAR
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOY));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(142L,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // WEEK
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.WEEK));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(21L,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // QUARTER
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.QUARTER));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(2L,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java
new file mode 100644
index 0000000..1dd602b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlNotExpression}.
+ */
+public class BeamSqlNotExpressionTest extends BeamSqlFnExecutorTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    Assert.assertTrue(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    Assert.assertFalse(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null));
+    Assert.assertNull(new BeamSqlNotExpression(operands).evaluate(record).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
new file mode 100644
index 0000000..ddb27a9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlMathBinaryExpression}.
+ */
+public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void testForGreaterThanTwoOperands() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // operands more than 2 not allowed
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    Assert.assertFalse(new BeamSqlRoundExpression(operands).accept());
+  }
+
+  @Test public void testForOneOperand() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // only one operand allowed in round function
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+  }
+
+  @Test public void testForOperandsType() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // varchar operand not allowed
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
+    Assert.assertFalse(new BeamSqlRoundExpression(operands).accept());
+  }
+
+  @Test public void testRoundFunction() {
+    // test round functions with operands of type bigint, int,
+    // tinyint, smallint, double, decimal
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    // round(double, double) => double
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
+    assertEquals(2.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    // round(integer,integer) => integer
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    // round(long,long) => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 5L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
+    assertEquals(5L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    // round(short) => short
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, new Short("4")));
+    assertEquals(SqlFunctions.toShort(4),
+        new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    // round(long,long) => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(2L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    // round(double, long) => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1.1, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.368768));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals(2.37, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 3.78683686458));
+    assertEquals(4.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 378.683686458));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -2));
+    assertEquals(400.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 378.683686458));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
+    assertEquals(380.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    // round(integer, double) => integer
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.2));
+    assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+
+    // operand with a BeamSqlInputRefExpression
+    // to select a column value from row of a record
+    operands.clear();
+    BeamSqlInputRefExpression ref0 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0);
+    operands.add(ref0);
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+
+    assertEquals(1234567L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testPowerFunction() {
+    // test power functions with operands of type bigint, int,
+    // tinyint, smallint, double, decimal
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
+    Assert.assertEquals(16.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    // power(integer,integer) => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals(4.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    // power(integer,long) => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
+    assertEquals(8.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+
+    // power(long,long) => long
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
+    assertEquals(4.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+
+    // power(double, int) => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(1.1, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+
+    // power(double, long) => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
+    assertEquals(1.1, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+
+    // power(integer, double) => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.2));
+    assertEquals(Math.pow(2, 2.2),
+        new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForTruncate() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
+    assertEquals(2.0, new BeamSqlTruncateExpression(operands).evaluate(record).getValue());
+    // truncate(double, integer) => double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.80685));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
+    assertEquals(2.8068, new BeamSqlTruncateExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForAtan2() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.875));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.56));
+    assertEquals(Math.atan2(0.875, 0.56),
+        new BeamSqlAtan2Expression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
new file mode 100644
index 0000000..71c98d4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.math;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlMathUnaryExpression}.
+ */
+public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void testForGreaterThanOneOperands() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // operands more than 1 not allowed
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
+    Assert.assertFalse(new BeamSqlAbsExpression(operands).accept());
+  }
+
+  @Test public void testForOperandsType() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // varchar operand not allowed
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2"));
+    Assert.assertFalse(new BeamSqlAbsExpression(operands).accept());
+  }
+
+  @Test public void testForUnaryExpressions() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for sqrt function
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+
+    // test for abs function
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, -28965734597L));
+    Assert
+        .assertEquals(28965734597L, new BeamSqlAbsExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForLnExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for LN function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert.assertEquals(Math.log(2), new BeamSqlLnExpression(operands).evaluate(record).getValue());
+
+    // test for LN function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert
+        .assertEquals(Math.log(2.4), new BeamSqlLnExpression(operands).evaluate(record).getValue());
+    // test for LN function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(Math.log(2.56),
+        new BeamSqlLnExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForLog10Expression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for log10 function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert.assertEquals(Math.log10(2),
+        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+    // test for log10 function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert.assertEquals(Math.log10(2.4),
+        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+    // test for log10 function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(Math.log10(2.56),
+        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForExpExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert
+        .assertEquals(Math.exp(2), new BeamSqlExpExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert.assertEquals(Math.exp(2.4),
+        new BeamSqlExpExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(Math.exp(2.56),
+        new BeamSqlExpExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForAcosExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert
+        .assertEquals(Double.NaN, new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
+    Assert.assertEquals(Math.acos(0.45),
+        new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
+    Assert.assertEquals(Math.acos(-0.367),
+        new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForAsinExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type double
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
+    Assert.assertEquals(Math.asin(0.45),
+        new BeamSqlAsinExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
+    Assert.assertEquals(Math.asin(-0.367),
+        new BeamSqlAsinExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForAtanExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type double
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
+    Assert.assertEquals(Math.atan(0.45),
+        new BeamSqlAtanExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
+    Assert.assertEquals(Math.atan(-0.367),
+        new BeamSqlAtanExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForCosExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type double
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
+    Assert.assertEquals(Math.cos(0.45),
+        new BeamSqlCosExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
+    Assert.assertEquals(Math.cos(-0.367),
+        new BeamSqlCosExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForCotExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type double
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, .45));
+    Assert.assertEquals(1.0d / Math.tan(0.45),
+        new BeamSqlCotExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-.367)));
+    Assert.assertEquals(1.0d / Math.tan(-0.367),
+        new BeamSqlCotExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForDegreesExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert.assertEquals(Math.toDegrees(2),
+        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert.assertEquals(Math.toDegrees(2.4),
+        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(Math.toDegrees(2.56),
+        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForRadiansExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert.assertEquals(Math.toRadians(2),
+        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert.assertEquals(Math.toRadians(2.4),
+        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(Math.toRadians(2.56),
+        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForSinExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert
+        .assertEquals(Math.sin(2), new BeamSqlSinExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert.assertEquals(Math.sin(2.4),
+        new BeamSqlSinExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(Math.sin(2.56),
+        new BeamSqlSinExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForTanExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert
+        .assertEquals(Math.tan(2), new BeamSqlTanExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert.assertEquals(Math.tan(2.4),
+        new BeamSqlTanExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(Math.tan(2.56),
+        new BeamSqlTanExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForSignExpression() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    // test for exp function with operand type smallint
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
+    Assert.assertEquals((short) 1, new BeamSqlSignExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type double
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
+    Assert.assertEquals(1.0, new BeamSqlSignExpression(operands).evaluate(record).getValue());
+    // test for exp function with operand type decimal
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
+    Assert.assertEquals(BigDecimal.ONE,
+        new BeamSqlSignExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForPi() {
+    Assert.assertEquals(Math.PI, new BeamSqlPiExpression().evaluate(record).getValue());
+  }
+
+  @Test public void testForCeil() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.68687979));
+    Assert.assertEquals(Math.ceil(2.68687979),
+        new BeamSqlCeilExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test public void testForFloor() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.68687979));
+    Assert.assertEquals(Math.floor(2.68687979),
+        new BeamSqlFloorExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
new file mode 100644
index 0000000..b749099
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCharLengthExpression.
+ */
+public class BeamSqlCharLengthExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertEquals(5,
+        new BeamSqlCharLengthExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
new file mode 100644
index 0000000..c77e1e6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlConcatExpression.
+ */
+public class BeamSqlConcatExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertTrue(new BeamSqlConcatExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    assertFalse(new BeamSqlConcatExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlConcatExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " world"));
+    assertEquals("hello world",
+        new BeamSqlConcatExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
new file mode 100644
index 0000000..557f235
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test of BeamSqlInitCapExpression.
+ */
+public class BeamSqlInitCapExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello world"));
+    assertEquals("Hello World",
+        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hEllO wOrld"));
+    assertEquals("Hello World",
+        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello     world"));
+    assertEquals("Hello     World",
+        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
new file mode 100644
index 0000000..9abbfd8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test of BeamSqlLowerExpression.
+ */
+public class BeamSqlLowerExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "HELLO"));
+    assertEquals("hello",
+        new BeamSqlLowerExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
new file mode 100644
index 0000000..e98fd62
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlOverlayExpression.
+ */
+public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlOverlayExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertTrue(new BeamSqlOverlayExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    assertEquals("w3resou3rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
+    assertEquals("w3resou33rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    assertEquals("w3resou3rce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
+    assertEquals("w3resouce",
+        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
new file mode 100644
index 0000000..4627610
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlPositionExpression.
+ */
+public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertTrue(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertFalse(new BeamSqlPositionExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertFalse(new BeamSqlPositionExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
new file mode 100644
index 0000000..9bb553f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpressionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlStringUnaryExpression.
+ */
+public class BeamSqlStringUnaryExpressionTest {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertTrue(new BeamSqlCharLengthExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlCharLengthExpression(operands).accept());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
new file mode 100644
index 0000000..8d54522
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlSubstringExpression.
+ */
+public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertTrue(new BeamSqlSubstringExpression(operands).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertTrue(new BeamSqlSubstringExpression(operands).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
+    assertEquals("he",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100));
+    assertEquals("hello",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
+    assertEquals("",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
+    assertEquals("",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
+    assertEquals("o",
+        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+  }
+
+}