You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/07 21:06:09 UTC

[2/4] beam git commit: Remove redundant windowing information from the BeamRecord itself element window information `BoundedWindow` is added in `BeamSqlExpression`.

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index a95c743..45dc621 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -52,10 +52,9 @@ public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
   @ProcessElement
   public void processElement(ProcessContext c, BoundedWindow window) {
     BeamRecord inputRow = c.element();
-    List<Object> results = executor.execute(inputRow);
+    List<Object> results = executor.execute(inputRow, window);
 
     BeamRecord outRow = new BeamRecord(outputRowType);
-    outRow.updateWindowRange(inputRow, window);
 
     for (int idx = 0; idx < results.size(); ++idx) {
       BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 8501157..71278ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
 import org.junit.Test;
 
 /**
@@ -224,15 +223,11 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
     BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
 
     PAssert.that(result).containsInAnyOrder(record1, record2);
 
@@ -271,29 +266,21 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
 
     BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 3L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
     BeamRecord record3 = new BeamRecord(resultType);
     record3.addField("f_int2", 0);
     record3.addField("size", 1L);
     record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
-    record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
-    record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
 
     BeamRecord record4 = new BeamRecord(resultType);
     record4.addField("f_int2", 0);
     record4.addField("size", 1L);
     record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
-    record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-    record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
 
     PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
 
@@ -333,15 +320,11 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
 
     BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime()));
 
     PAssert.that(result).containsInAnyOrder(record1, record2);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
index 5278871..1bcda2c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
@@ -34,22 +34,22 @@ public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
   public void testIsNull() {
     BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
         new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
         BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 
   @Test
   public void testIsNotNull() {
     BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
         new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
         BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
index f6e33b5..51a170d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -37,11 +37,11 @@ public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
 
-    Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+    Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record, null).getValue());
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
 
-    Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+    Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test
@@ -50,11 +50,11 @@ public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
 
-    Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+    Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record, null).getValue());
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
 
-    Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+    Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record, null).getValue());
 
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
index 068f041..e02554f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
@@ -72,14 +72,14 @@ public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
     assertEquals("hello", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
     assertEquals("world", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
@@ -88,6 +88,6 @@ public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
     assertEquals("hello1", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
index 0c0aaa5..f4e3cf9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -52,14 +52,14 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
   public void testForIntegerToBigintTypeCasting() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
     Assert.assertEquals(5L,
-        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record, null).getLong());
   }
 
   @Test
   public void testForDoubleToBigIntCasting() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
     Assert.assertEquals(5L,
-        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record, null).getLong());
   }
 
   @Test
@@ -67,7 +67,7 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for yyyyMMdd format
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
     Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record, null).getValue());
   }
 
   @Test
@@ -75,7 +75,7 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
     //test for yyyy-MM-dd format
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
     Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record, null).getValue());
   }
 
   @Test
@@ -83,14 +83,14 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for yy.MM.dd format
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
     Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record, null).getValue());
   }
 
   @Test
   public void testForTimestampCastExpression() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
     Assert.assertEquals(SqlTypeName.TIMESTAMP,
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record, null)
             .getOutputType());
   }
 
@@ -98,28 +98,32 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
   public void testDateTimeFormatWithMillis() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
     Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
   @Test
   public void testDateTimeFormatWithTimezone() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
     Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
   @Test
   public void testDateTimeFormat() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
     Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
   @Test(expected = RuntimeException.class)
   public void testForCastTypeNotSupported() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
     Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
index ae3a12f..8aad6b3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
@@ -40,12 +40,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlEqualsExpression exp1 = new BeamSqlEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 100L)));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlEqualsExpression exp2 = new BeamSqlEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -53,12 +53,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlGreaterThanExpression exp1 = new BeamSqlGreaterThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlGreaterThanExpression exp2 = new BeamSqlGreaterThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234566L)));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -66,12 +66,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlGreaterThanOrEqualsExpression exp1 = new BeamSqlGreaterThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlGreaterThanOrEqualsExpression exp2 = new BeamSqlGreaterThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234568L)));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -79,12 +79,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlLessThanExpression exp1 = new BeamSqlLessThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
             BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlLessThanExpression exp2 = new BeamSqlLessThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
             BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -92,12 +92,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlLessThanOrEqualsExpression exp1 = new BeamSqlLessThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
             BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.9)));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlLessThanOrEqualsExpression exp2 = new BeamSqlLessThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
             BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.0)));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -105,11 +105,11 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlNotEqualsExpression exp1 = new BeamSqlNotEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlNotEqualsExpression exp2 = new BeamSqlNotEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 0L)));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
index c78f9c0..e543d4f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
@@ -30,28 +30,28 @@ public class BeamSqlInputRefExpressionTest extends BeamSqlFnExecutorTestBase {
   @Test
   public void testRefInRange() {
     BeamSqlInputRefExpression ref0 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0);
-    Assert.assertEquals(record.getLong(0), ref0.evaluate(record).getValue());
+    Assert.assertEquals(record.getLong(0), ref0.evaluate(record, null).getValue());
 
     BeamSqlInputRefExpression ref1 = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1);
-    Assert.assertEquals(record.getInteger(1), ref1.evaluate(record).getValue());
+    Assert.assertEquals(record.getInteger(1), ref1.evaluate(record, null).getValue());
 
     BeamSqlInputRefExpression ref2 = new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2);
-    Assert.assertEquals(record.getDouble(2), ref2.evaluate(record).getValue());
+    Assert.assertEquals(record.getDouble(2), ref2.evaluate(record, null).getValue());
 
     BeamSqlInputRefExpression ref3 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3);
-    Assert.assertEquals(record.getLong(3), ref3.evaluate(record).getValue());
+    Assert.assertEquals(record.getLong(3), ref3.evaluate(record, null).getValue());
   }
 
 
   @Test(expected = IndexOutOfBoundsException.class)
   public void testRefOutOfRange(){
     BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 4);
-    ref.evaluate(record).getValue();
+    ref.evaluate(record, null).getValue();
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testTypeUnMatch(){
     BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 0);
-    ref.evaluate(record).getValue();
+    ref.evaluate(record, null).getValue();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
index c4e3d3f..81f9ce0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
@@ -31,28 +31,28 @@ public class BeamSqlPrimitiveTest extends BeamSqlFnExecutorTestBase {
   @Test
   public void testPrimitiveInt(){
     BeamSqlPrimitive<Integer> expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch1(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch2(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch3(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.FLOAT, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch4(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
index 2e01737..e614fdf 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
@@ -69,7 +69,7 @@ public class BeamSqlReinterpretExpressionTest extends BeamSqlFnExecutorTestBase
     d.setTime(1000);
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, d));
     assertEquals(1000L, new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
index c4732f5..19098a6 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
@@ -37,7 +37,7 @@ public class BeamSqlUdfExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlUdfExpression exp = new BeamSqlUdfExpression(
         UdfFn.class.getMethod("negative", Integer.class), operands, SqlTypeName.INTEGER);
 
-    Assert.assertEquals(-10, exp.evaluate(record).getValue());
+    Assert.assertEquals(-10, exp.evaluate(record, null).getValue());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
index 44001f9..a8d5e43 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -84,32 +84,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // float + long => float
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(Float.valueOf(1.1F + 1),
-        new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+        new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2.1, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2.1, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testMinus() {
@@ -118,32 +118,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => long
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
 
     // float + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(2.1F - 1L,
-        new BeamSqlMinusExpression(operands).evaluate(record).getValue().floatValue(), 0.1);
+        new BeamSqlMinusExpression(operands).evaluate(record, null).getValue().floatValue(), 0.1);
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(1.1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1.1, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testMultiply() {
@@ -152,32 +152,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // float + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(Float.valueOf(2.1F * 1L),
-        new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+        new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2.1, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2.1, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testDivide() {
@@ -186,32 +186,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // float + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(2.1F / 1,
-        new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+        new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2.1, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2.1, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testMod() {
@@ -220,18 +220,18 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => long
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertEquals(1, new BeamSqlModExpression(operands).evaluate(record).getValue());
+    assertEquals(1, new BeamSqlModExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
index cd390c4..bfca720 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
@@ -32,7 +32,7 @@ public class BeamSqlCurrentDateExpressionTest extends BeamSqlDateExpressionTestB
     Assert.assertEquals(
         SqlTypeName.DATE,
         new BeamSqlCurrentDateExpression()
-            .evaluate(BeamSqlFnExecutorTestBase.record).getOutputType()
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getOutputType()
     );
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
index 416df01..af3cacd 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
@@ -34,6 +34,6 @@ public class BeamSqlCurrentTimeExpressionTest extends BeamSqlDateExpressionTestB
   public void test() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     assertEquals(SqlTypeName.TIME,
-        new BeamSqlCurrentTimeExpression(operands).evaluate(record).getOutputType());
+        new BeamSqlCurrentTimeExpression(operands).evaluate(record, null).getOutputType());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
index d44b6c1..c171e40 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
@@ -34,6 +34,6 @@ public class BeamSqlCurrentTimestampExpressionTest extends BeamSqlDateExpression
   public void test() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     assertEquals(SqlTypeName.TIMESTAMP,
-        new BeamSqlCurrentTimestampExpression(operands).evaluate(record).getOutputType());
+        new BeamSqlCurrentTimestampExpression(operands).evaluate(record, null).getOutputType());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
index 5bc99e8..141bbf5 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
@@ -40,11 +40,11 @@ public class BeamSqlDateCeilExpressionTest extends BeamSqlDateExpressionTestBase
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
     Assert.assertEquals(str2DateTime("2018-01-01 00:00:00"),
         new BeamSqlDateCeilExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getDate());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getDate());
 
     operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
     Assert.assertEquals(str2DateTime("2017-06-01 00:00:00"),
         new BeamSqlDateCeilExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getDate());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getDate());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
index ecab54b..ede12ce 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
@@ -39,11 +39,11 @@ public class BeamSqlDateFloorExpressionTest extends BeamSqlDateExpressionTestBas
     // YEAR
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
     assertEquals(str2DateTime("2017-01-01 00:00:00"),
-        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+        new BeamSqlDateFloorExpression(operands).evaluate(record, null).getDate());
     // MONTH
     operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
     assertEquals(str2DateTime("2017-05-01 00:00:00"),
-        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+        new BeamSqlDateFloorExpression(operands).evaluate(record, null).getDate());
 
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
index 0ca7e3e..b03827a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
@@ -43,7 +43,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(2017L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // MONTH
     operands.clear();
@@ -52,7 +52,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(5L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // DAY
     operands.clear();
@@ -61,7 +61,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(22L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // DAY_OF_WEEK
     operands.clear();
@@ -70,7 +70,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(2L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // DAY_OF_YEAR
     operands.clear();
@@ -79,7 +79,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(142L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // WEEK
     operands.clear();
@@ -88,7 +88,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(21L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // QUARTER
     operands.clear();
@@ -97,7 +97,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(2L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
index a437db7..c98ce23 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
@@ -34,14 +34,14 @@ public class BeamSqlNotExpressionTest extends BeamSqlFnExecutorTestBase {
   @Test public void evaluate() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
-    Assert.assertTrue(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+    Assert.assertTrue(new BeamSqlNotExpression(operands).evaluate(record, null).getBoolean());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    Assert.assertFalse(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+    Assert.assertFalse(new BeamSqlNotExpression(operands).evaluate(record, null).getBoolean());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null));
-    Assert.assertNull(new BeamSqlNotExpression(operands).evaluate(record).getValue());
+    Assert.assertNull(new BeamSqlNotExpression(operands).evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
index d42164e..6665253 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
@@ -67,60 +67,66 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // round(double, double) => double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
-    Assert.assertEquals(2.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
     // round(integer,integer) => integer
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(long,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 5L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
-    Assert.assertEquals(5L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(5L, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(short) => short
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, new Short("4")));
     Assert.assertEquals(SqlFunctions.toShort(4),
-        new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(long,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    Assert.assertEquals(2L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2L, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(double, long) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    Assert.assertEquals(1.1, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.1,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.368768));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    Assert.assertEquals(2.37, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2.37,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 3.78683686458));
-    Assert.assertEquals(4.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(4.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 378.683686458));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -2));
-    Assert.assertEquals(400.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(400.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 378.683686458));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
-    Assert.assertEquals(380.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(380.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(integer, double) => integer
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.2));
-    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // operand with a BeamSqlInputRefExpression
     // to select a column value from row of a record
@@ -129,7 +135,8 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(ref0);
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
 
-    Assert.assertEquals(1234567L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1234567L,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testPowerFunction() {
@@ -139,55 +146,62 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
-    Assert.assertEquals(16.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(16.0,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
     // power(integer,integer) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    Assert.assertEquals(4.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(4.0,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
     // power(integer,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
-    Assert.assertEquals(8.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(8.0
+        , new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(long,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    Assert.assertEquals(4.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(4.0,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(double, int) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    Assert.assertEquals(1.1, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.1,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(double, long) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    Assert.assertEquals(1.1, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.1,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(integer, double) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.2));
     Assert.assertEquals(Math.pow(2, 2.2),
-        new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForTruncate() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
-    Assert.assertEquals(2.0, new BeamSqlTruncateExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2.0,
+        new BeamSqlTruncateExpression(operands).evaluate(record, null).getValue());
     // truncate(double, integer) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.80685));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
     Assert.assertEquals(2.8068,
-        new BeamSqlTruncateExpression(operands).evaluate(record).getValue());
+        new BeamSqlTruncateExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAtan2() {
@@ -195,7 +209,7 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.875));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.56));
     Assert.assertEquals(Math.atan2(0.875, 0.56),
-        new BeamSqlAtan2Expression(operands).evaluate(record).getValue());
+        new BeamSqlAtan2Expression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
index 3f3326b..d80a670 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
@@ -59,8 +59,8 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for abs function
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, -28965734597L));
-    Assert
-        .assertEquals(28965734597L, new BeamSqlAbsExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(28965734597L,
+        new BeamSqlAbsExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForLnExpression() {
@@ -68,18 +68,20 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for LN function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert.assertEquals(Math.log(2), new BeamSqlLnExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.log(2),
+        new BeamSqlLnExpression(operands).evaluate(record, null).getValue());
 
     // test for LN function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert
-        .assertEquals(Math.log(2.4), new BeamSqlLnExpression(operands).evaluate(record).getValue());
+        .assertEquals(Math.log(2.4),
+            new BeamSqlLnExpression(operands).evaluate(record, null).getValue());
     // test for LN function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.log(2.56),
-        new BeamSqlLnExpression(operands).evaluate(record).getValue());
+        new BeamSqlLnExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForLog10Expression() {
@@ -88,17 +90,17 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for log10 function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
     Assert.assertEquals(Math.log10(2),
-        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+        new BeamSqlLogExpression(operands).evaluate(record, null).getValue());
     // test for log10 function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.log10(2.4),
-        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+        new BeamSqlLogExpression(operands).evaluate(record, null).getValue());
     // test for log10 function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.log10(2.56),
-        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+        new BeamSqlLogExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForExpExpression() {
@@ -106,18 +108,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Math.exp(2), new BeamSqlExpExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.exp(2),
+        new BeamSqlExpExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.exp(2.4),
-        new BeamSqlExpExpression(operands).evaluate(record).getValue());
+        new BeamSqlExpExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.exp(2.56),
-        new BeamSqlExpExpression(operands).evaluate(record).getValue());
+        new BeamSqlExpExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAcosExpression() {
@@ -125,18 +127,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Double.NaN, new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Double.NaN,
+        new BeamSqlAcosExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.acos(0.45),
-        new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+        new BeamSqlAcosExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.acos(-0.367),
-        new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+        new BeamSqlAcosExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAsinExpression() {
@@ -145,12 +147,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.asin(0.45),
-        new BeamSqlAsinExpression(operands).evaluate(record).getValue());
+        new BeamSqlAsinExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.asin(-0.367),
-        new BeamSqlAsinExpression(operands).evaluate(record).getValue());
+        new BeamSqlAsinExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAtanExpression() {
@@ -159,12 +161,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.atan(0.45),
-        new BeamSqlAtanExpression(operands).evaluate(record).getValue());
+        new BeamSqlAtanExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.atan(-0.367),
-        new BeamSqlAtanExpression(operands).evaluate(record).getValue());
+        new BeamSqlAtanExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForCosExpression() {
@@ -173,12 +175,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.cos(0.45),
-        new BeamSqlCosExpression(operands).evaluate(record).getValue());
+        new BeamSqlCosExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.cos(-0.367),
-        new BeamSqlCosExpression(operands).evaluate(record).getValue());
+        new BeamSqlCosExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForCotExpression() {
@@ -187,12 +189,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, .45));
     Assert.assertEquals(1.0d / Math.tan(0.45),
-        new BeamSqlCotExpression(operands).evaluate(record).getValue());
+        new BeamSqlCotExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-.367)));
     Assert.assertEquals(1.0d / Math.tan(-0.367),
-        new BeamSqlCotExpression(operands).evaluate(record).getValue());
+        new BeamSqlCotExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForDegreesExpression() {
@@ -201,17 +203,17 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
     Assert.assertEquals(Math.toDegrees(2),
-        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+        new BeamSqlDegreesExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.toDegrees(2.4),
-        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+        new BeamSqlDegreesExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.toDegrees(2.56),
-        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+        new BeamSqlDegreesExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForRadiansExpression() {
@@ -220,17 +222,17 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
     Assert.assertEquals(Math.toRadians(2),
-        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+        new BeamSqlRadiansExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.toRadians(2.4),
-        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+        new BeamSqlRadiansExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.toRadians(2.56),
-        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+        new BeamSqlRadiansExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForSinExpression() {
@@ -238,18 +240,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Math.sin(2), new BeamSqlSinExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.sin(2),
+        new BeamSqlSinExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.sin(2.4),
-        new BeamSqlSinExpression(operands).evaluate(record).getValue());
+        new BeamSqlSinExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.sin(2.56),
-        new BeamSqlSinExpression(operands).evaluate(record).getValue());
+        new BeamSqlSinExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForTanExpression() {
@@ -257,18 +259,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Math.tan(2), new BeamSqlTanExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.tan(2),
+        new BeamSqlTanExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.tan(2.4),
-        new BeamSqlTanExpression(operands).evaluate(record).getValue());
+        new BeamSqlTanExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.tan(2.56),
-        new BeamSqlTanExpression(operands).evaluate(record).getValue());
+        new BeamSqlTanExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForSignExpression() {
@@ -276,34 +278,35 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert.assertEquals((short) 1, new BeamSqlSignExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals((short) 1
+        , new BeamSqlSignExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
-    Assert.assertEquals(1.0, new BeamSqlSignExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.0, new BeamSqlSignExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(BigDecimal.ONE,
-        new BeamSqlSignExpression(operands).evaluate(record).getValue());
+        new BeamSqlSignExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForPi() {
-    Assert.assertEquals(Math.PI, new BeamSqlPiExpression().evaluate(record).getValue());
+    Assert.assertEquals(Math.PI, new BeamSqlPiExpression().evaluate(record, null).getValue());
   }
 
   @Test public void testForCeil() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.68687979));
     Assert.assertEquals(Math.ceil(2.68687979),
-        new BeamSqlCeilExpression(operands).evaluate(record).getValue());
+        new BeamSqlCeilExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForFloor() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.68687979));
     Assert.assertEquals(Math.floor(2.68687979),
-        new BeamSqlFloorExpression(operands).evaluate(record).getValue());
+        new BeamSqlFloorExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
index 118097f..d6c3565 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
@@ -38,7 +38,7 @@ public class BeamSqlCharLengthExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     assertEquals(5,
-        new BeamSqlCharLengthExpression(operands).evaluate(record).getValue());
+        new BeamSqlCharLengthExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
index c3f8041..c350fe2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
@@ -60,7 +60,7 @@ public class BeamSqlConcatExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " world"));
     Assert.assertEquals("hello world",
-        new BeamSqlConcatExpression(operands).evaluate(record).getValue());
+        new BeamSqlConcatExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
index 24f9945..7ea83d1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
@@ -38,17 +38,17 @@ public class BeamSqlInitCapExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello world"));
     assertEquals("Hello World",
-        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+        new BeamSqlInitCapExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hEllO wOrld"));
     assertEquals("Hello World",
-        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+        new BeamSqlInitCapExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello     world"));
     assertEquals("Hello     World",
-        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+        new BeamSqlInitCapExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
index e34fcc0..393680c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
@@ -38,7 +38,7 @@ public class BeamSqlLowerExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "HELLO"));
     assertEquals("hello",
-        new BeamSqlLowerExpression(operands).evaluate(record).getValue());
+        new BeamSqlLowerExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
index 09bbdc8..2b4c0ea 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
@@ -57,7 +57,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
@@ -65,7 +65,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
     Assert.assertEquals("w3resou33rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
@@ -73,7 +73,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
     Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
@@ -81,7 +81,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
     Assert.assertEquals("w3resouce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
   }
 
 }