You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/07 21:06:08 UTC

[1/4] beam git commit: Remove redundant windowing information from the BeamRecord itself element window information `BoundedWindow` is added in `BeamSqlExpression`.

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 8f922f74b -> 79880b6ab


http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
index 4c21a71..3b477cc 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpressionTest.java
@@ -66,19 +66,19 @@ public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
-    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "worldhello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+    assertEquals(5, new BeamSqlPositionExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record).getValue());
+    assertEquals(-1, new BeamSqlPositionExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
index 2fb451e..b48a8be 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
@@ -54,48 +54,48 @@ public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+        new BeamSqlSubstringExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     assertEquals("he",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+        new BeamSqlSubstringExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
     assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+        new BeamSqlSubstringExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100));
     assertEquals("hello",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+        new BeamSqlSubstringExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0));
     assertEquals("",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+        new BeamSqlSubstringExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
     assertEquals("",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+        new BeamSqlSubstringExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
     assertEquals("o",
-        new BeamSqlSubstringExpression(operands).evaluate(record).getValue());
+        new BeamSqlSubstringExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
index 6f9c706..3645082 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpressionTest.java
@@ -62,26 +62,26 @@ public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
     Assert.assertEquals("__hehe",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+        new BeamSqlTrimExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.TRAILING));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hehe__hehe"));
     Assert.assertEquals("hehe__",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+        new BeamSqlTrimExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, SqlTrimFunction.Flag.BOTH));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "he"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "__"));
     Assert.assertEquals("__",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+        new BeamSqlTrimExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " hello "));
     Assert.assertEquals("hello",
-        new BeamSqlTrimExpression(operands).evaluate(record).getValue());
+        new BeamSqlTrimExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void leadingTrim() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
index e69a3a5..41e5a28 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpressionTest.java
@@ -38,7 +38,7 @@ public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     assertEquals("HELLO",
-        new BeamSqlUpperExpression(operands).evaluate(record).getValue());
+        new BeamSqlUpperExpression(operands).evaluate(record, null).getValue());
   }
 
 }


[3/4] beam git commit: Remove redundant windowing information from the BeamRecord itself element window information `BoundedWindow` is added in `BeamSqlExpression`.

Posted by ta...@apache.org.
Remove redundant windowing information from the BeamRecord itself
element window information `BoundedWindow` is added in `BeamSqlExpression`.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0b1fed1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0b1fed1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0b1fed1

Branch: refs/heads/DSL_SQL
Commit: c0b1fed1c7687a1e44e8ee6997be40aa1785ea51
Parents: 8f922f7
Author: mingmxu <mi...@ebay.com>
Authored: Fri Aug 4 11:12:45 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Fri Aug 4 11:41:56 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java |  7 --
 .../org/apache/beam/sdk/values/BeamRecord.java  | 36 +-------
 .../interpreter/BeamSqlExpressionExecutor.java  |  5 +-
 .../sql/impl/interpreter/BeamSqlFnExecutor.java |  5 +-
 .../operator/BeamSqlCaseExpression.java         |  9 +-
 .../operator/BeamSqlCastExpression.java         | 31 +++----
 .../interpreter/operator/BeamSqlExpression.java |  9 +-
 .../operator/BeamSqlInputRefExpression.java     |  3 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |  5 +-
 .../operator/BeamSqlReinterpretExpression.java  |  7 +-
 .../operator/BeamSqlUdfExpression.java          |  5 +-
 .../operator/BeamSqlWindowEndExpression.java    | 12 ++-
 .../operator/BeamSqlWindowExpression.java       |  5 +-
 .../operator/BeamSqlWindowStartExpression.java  | 12 ++-
 .../arithmetic/BeamSqlArithmeticExpression.java |  8 +-
 .../comparison/BeamSqlCompareExpression.java    |  7 +-
 .../comparison/BeamSqlIsNotNullExpression.java  |  5 +-
 .../comparison/BeamSqlIsNullExpression.java     |  5 +-
 .../date/BeamSqlCurrentDateExpression.java      |  3 +-
 .../date/BeamSqlCurrentTimeExpression.java      |  3 +-
 .../date/BeamSqlCurrentTimestampExpression.java |  3 +-
 .../date/BeamSqlDateCeilExpression.java         |  5 +-
 .../date/BeamSqlDateFloorExpression.java        |  5 +-
 .../operator/date/BeamSqlExtractExpression.java |  5 +-
 .../operator/logical/BeamSqlAndExpression.java  |  5 +-
 .../operator/logical/BeamSqlNotExpression.java  |  7 +-
 .../operator/logical/BeamSqlOrExpression.java   |  5 +-
 .../math/BeamSqlMathBinaryExpression.java       |  6 +-
 .../math/BeamSqlMathUnaryExpression.java        |  6 +-
 .../operator/math/BeamSqlPiExpression.java      |  3 +-
 .../operator/math/BeamSqlRandExpression.java    |  5 +-
 .../math/BeamSqlRandIntegerExpression.java      |  7 +-
 .../string/BeamSqlCharLengthExpression.java     |  5 +-
 .../string/BeamSqlConcatExpression.java         |  7 +-
 .../string/BeamSqlInitCapExpression.java        |  5 +-
 .../operator/string/BeamSqlLowerExpression.java |  5 +-
 .../string/BeamSqlOverlayExpression.java        | 11 +--
 .../string/BeamSqlPositionExpression.java       |  9 +-
 .../string/BeamSqlSubstringExpression.java      |  9 +-
 .../operator/string/BeamSqlTrimExpression.java  | 11 +--
 .../operator/string/BeamSqlUpperExpression.java |  5 +-
 .../transform/BeamAggregationTransforms.java    |  7 +-
 .../sql/impl/transform/BeamSqlFilterFn.java     |  5 +-
 .../sql/impl/transform/BeamSqlProjectFn.java    |  3 +-
 .../sql/BeamSqlDslAggregationTest.java          | 17 ----
 .../operator/BeamNullExperssionTest.java        |  8 +-
 .../operator/BeamSqlAndOrExpressionTest.java    |  8 +-
 .../operator/BeamSqlCaseExpressionTest.java     |  6 +-
 .../operator/BeamSqlCastExpressionTest.java     | 24 +++---
 .../operator/BeamSqlCompareExpressionTest.java  | 24 +++---
 .../operator/BeamSqlInputRefExpressionTest.java | 12 +--
 .../operator/BeamSqlPrimitiveTest.java          | 10 +--
 .../BeamSqlReinterpretExpressionTest.java       |  2 +-
 .../operator/BeamSqlUdfExpressionTest.java      |  2 +-
 .../BeamSqlArithmeticExpressionTest.java        | 46 +++++-----
 .../date/BeamSqlCurrentDateExpressionTest.java  |  2 +-
 .../date/BeamSqlCurrentTimeExpressionTest.java  |  2 +-
 .../BeamSqlCurrentTimestampExpressionTest.java  |  2 +-
 .../date/BeamSqlDateCeilExpressionTest.java     |  4 +-
 .../date/BeamSqlDateFloorExpressionTest.java    |  4 +-
 .../date/BeamSqlExtractExpressionTest.java      | 14 +--
 .../logical/BeamSqlNotExpressionTest.java       |  6 +-
 .../math/BeamSqlMathBinaryExpressionTest.java   | 58 ++++++++-----
 .../math/BeamSqlMathUnaryExpressionTest.java    | 91 ++++++++++----------
 .../string/BeamSqlCharLengthExpressionTest.java |  2 +-
 .../string/BeamSqlConcatExpressionTest.java     |  2 +-
 .../string/BeamSqlInitCapExpressionTest.java    |  6 +-
 .../string/BeamSqlLowerExpressionTest.java      |  2 +-
 .../string/BeamSqlOverlayExpressionTest.java    |  8 +-
 .../string/BeamSqlPositionExpressionTest.java   |  6 +-
 .../string/BeamSqlSubstringExpressionTest.java  | 14 +--
 .../string/BeamSqlTrimExpressionTest.java       |  8 +-
 .../string/BeamSqlUpperExpressionTest.java      |  2 +-
 73 files changed, 366 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index 06958a4..fe9c295 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.BeamRecordType;
 @Experimental
 public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   private static final BitSetCoder nullListCoder = BitSetCoder.of();
-  private static final InstantCoder instantCoder = InstantCoder.of();
 
   private BeamRecordType recordType;
   private List<Coder> coderArray;
@@ -64,9 +63,6 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
 
       coderArray.get(idx).encode(value.getFieldValue(idx), outStream);
     }
-
-    instantCoder.encode(value.getWindowStart(), outStream);
-    instantCoder.encode(value.getWindowEnd(), outStream);
   }
 
   @Override
@@ -82,9 +78,6 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
       record.addField(idx, coderArray.get(idx).decode(inStream));
     }
 
-    record.setWindowStart(instantCoder.decode(inStream));
-    record.setWindowEnd(instantCoder.decode(inStream));
-
     return record;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index bac649e..8d0aa42 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -24,11 +24,7 @@ import java.util.BitSet;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.joda.time.Instant;
 
 /**
  * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
@@ -42,9 +38,6 @@ public class BeamRecord implements Serializable {
   private BitSet nullFields;
   private BeamRecordType dataType;
 
-  private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
-  private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
-
   public BeamRecord(BeamRecordType dataType) {
     this.dataType = dataType;
     this.nullFields = new BitSet(dataType.size());
@@ -62,17 +55,6 @@ public class BeamRecord implements Serializable {
     }
   }
 
-  public void updateWindowRange(BeamRecord upstreamRecord, BoundedWindow window){
-    windowStart = upstreamRecord.windowStart;
-    windowEnd = upstreamRecord.windowEnd;
-
-    if (window instanceof IntervalWindow) {
-      IntervalWindow iWindow = (IntervalWindow) window;
-      windowStart = iWindow.start();
-      windowEnd = iWindow.end();
-    }
-  }
-
   public void addField(String fieldName, Object fieldValue) {
     addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
   }
@@ -211,26 +193,10 @@ public class BeamRecord implements Serializable {
     return nullFields.get(idx);
   }
 
-  public Instant getWindowStart() {
-    return windowStart;
-  }
-
-  public Instant getWindowEnd() {
-    return windowEnd;
-  }
-
-  public void setWindowStart(Instant windowStart) {
-    this.windowStart = windowStart;
-  }
-
-  public void setWindowEnd(Instant windowEnd) {
-    this.windowEnd = windowEnd;
-  }
-
   @Override
   public String toString() {
     return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
-        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
+        + dataType + "]";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
index 3cd6d65..3aaf505 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
 
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 
 /**
@@ -34,10 +35,10 @@ public interface BeamSqlExpressionExecutor extends Serializable {
   void prepare();
 
   /**
-   * apply transformation to input record {@link BeamRecord}.
+   * apply transformation to input record {@link BeamRecord} with {@link BoundedWindow}.
    *
    */
-  List<Object> execute(BeamRecord inputRow);
+  List<Object> execute(BeamRecord inputRow, BoundedWindow window);
 
   void close();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 0f77ed8..8f9797b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -88,6 +88,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamS
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -427,10 +428,10 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
   }
 
   @Override
-  public List<Object> execute(BeamRecord inputRow) {
+  public List<Object> execute(BeamRecord inputRow, BoundedWindow window) {
     List<Object> results = new ArrayList<>();
     for (BeamSqlExpression exp : exps) {
-      results.add(exp.evaluate(inputRow).getValue());
+      results.add(exp.evaluate(inputRow, window).getValue());
     }
     return results;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
index af48cbe..955444f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
@@ -19,6 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -48,16 +49,16 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     for (int i = 0; i < operands.size() - 1; i += 2) {
-      if (opValueEvaluated(i, inputRow)) {
+      if (opValueEvaluated(i, inputRow, window)) {
         return BeamSqlPrimitive.of(
             outputType,
-            opValueEvaluated(i + 1, inputRow)
+            opValueEvaluated(i + 1, inputRow, window)
         );
       }
     }
     return BeamSqlPrimitive.of(outputType,
-        opValueEvaluated(operands.size() - 1, inputRow));
+        opValueEvaluated(operands.size() - 1, inputRow, window));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
index 3786281..9ea66c1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.runtime.SqlFunctions;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -71,40 +72,40 @@ public class BeamSqlCastExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     SqlTypeName castOutputType = getOutputType();
     switch (castOutputType) {
       case INTEGER:
         return BeamSqlPrimitive
-            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow, window)));
       case DOUBLE:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+        return BeamSqlPrimitive.of(SqlTypeName.DOUBLE,
+            SqlFunctions.toDouble(opValueEvaluated(index, inputRow, window)));
       case SMALLINT:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+        return BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
+            SqlFunctions.toShort(opValueEvaluated(index, inputRow, window)));
       case TINYINT:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+        return BeamSqlPrimitive.of(SqlTypeName.TINYINT,
+            SqlFunctions.toByte(opValueEvaluated(index, inputRow, window)));
       case BIGINT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow, window)));
       case DECIMAL:
         return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
-            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow, window)));
       case FLOAT:
         return BeamSqlPrimitive
-            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow, window)));
       case CHAR:
       case VARCHAR:
         return BeamSqlPrimitive
-            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow, window).toString());
       case DATE:
-        return BeamSqlPrimitive
-            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+        return BeamSqlPrimitive.of(SqlTypeName.DATE,
+            toDate(opValueEvaluated(index, inputRow, window), outputDateFormat));
       case TIMESTAMP:
         return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-            toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+            toTimeStamp(opValueEvaluated(index, inputRow, window), outputTimestampFormat));
     }
     throw new UnsupportedOperationException(
         String.format("Cast to type %s not supported", castOutputType));

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
index f42a365..d18b141 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -49,8 +50,8 @@ public abstract class BeamSqlExpression implements Serializable {
     return op(idx).getOutputType();
   }
 
-  public <T> T opValueEvaluated(int idx, BeamRecord row) {
-    return (T) op(idx).evaluate(row).getValue();
+  public <T> T opValueEvaluated(int idx, BeamRecord row, BoundedWindow window) {
+    return (T) op(idx).evaluate(row, window).getValue();
   }
 
   /**
@@ -59,10 +60,10 @@ public abstract class BeamSqlExpression implements Serializable {
   public abstract boolean accept();
 
   /**
-   * Apply input record {@link BeamRecord} to this expression,
+   * Apply input record {@link BeamRecord} with {@link BoundedWindow} to this expression,
    * the output value is wrapped with {@link BeamSqlPrimitive}.
    */
-  public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow);
+  public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window);
 
   public List<BeamSqlExpression> getOperands() {
     return operands;

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
index 8c3d4d4..a2d1624 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -37,7 +38,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index f763898..9175caa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -21,13 +21,14 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.NlsString;
 
 /**
  * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
- * It holds the value, and return it directly during {@link #evaluate(BeamRecord)}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamRecord, BoundedWindow)}.
  *
  */
 public class BeamSqlPrimitive<T> extends BeamSqlExpression {
@@ -145,7 +146,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow) {
+  public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow, BoundedWindow window) {
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
index c1fa2c7..2ec4fb5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -41,13 +42,13 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression {
         && SqlTypeName.DATETIME_TYPES.contains(opType(0));
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     if (opType(0) == SqlTypeName.TIME) {
-      GregorianCalendar date = opValueEvaluated(0, inputRow);
+      GregorianCalendar date = opValueEvaluated(0, inputRow, window);
       return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
 
     } else {
-      Date date = opValueEvaluated(0, inputRow);
+      Date date = opValueEvaluated(0, inputRow, window);
       return BeamSqlPrimitive.of(outputType, date.getTime());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index da706f3..f1bcb66 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -51,14 +52,14 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     if (method == null) {
       reConstructMethod();
     }
     try {
       List<Object> paras = new ArrayList<>();
       for (BeamSqlExpression e : getOperands()) {
-        paras.add(e.evaluate(inputRow).getValue());
+        paras.add(e.evaluate(inputRow, window).getValue());
       }
 
       return BeamSqlPrimitive.of(getOutputType(),

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
index 2f4c165..919612e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.Date;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -34,9 +36,13 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
-    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRow.getWindowEnd().getMillis()));
+  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow, BoundedWindow window) {
+    if (window instanceof IntervalWindow) {
+      return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, ((IntervalWindow) window).end().toDate());
+    } else {
+      throw new UnsupportedOperationException(
+          "Cannot run HOP_END|TUMBLE_END|SESSION_END on GlobalWindow.");
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
index 2f3dd5c..0298f26 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.Date;
 import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -42,9 +43,9 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
+  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow, BoundedWindow window) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        (Date) operands.get(0).evaluate(inputRow).getValue());
+        (Date) operands.get(0).evaluate(inputRow, window).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
index 9186ec0..4b250a5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
 import java.util.Date;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -35,9 +37,13 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
-    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
-        new Date(inputRow.getWindowStart().getMillis()));
+  public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow, BoundedWindow window) {
+    if (window instanceof IntervalWindow) {
+      return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, ((IntervalWindow) window).start().toDate());
+    } else {
+      throw new UnsupportedOperationException(
+          "Cannot run HOP_START|TUMBLE_START|SESSION_START on GlobalWindow.");
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index fd36457..cc15ff5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -50,11 +51,12 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
     super(operands, outputType);
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow,
+      BoundedWindow window) {
     BigDecimal left = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(0, inputRow).toString()));
+        Double.valueOf(opValueEvaluated(0, inputRow, window).toString()));
     BigDecimal right = BigDecimal.valueOf(
-        Double.valueOf(opValueEvaluated(1, inputRow).toString()));
+        Double.valueOf(opValueEvaluated(1, inputRow, window).toString()));
 
     BigDecimal result = calc(left, right);
     return getCorrectlyTypedResult(result);

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
index 93032ae..df8bd61 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -51,9 +52,9 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
-    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
-    Object rightValue = operands.get(1).evaluate(inputRow).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
+    Object leftValue = operands.get(0).evaluate(inputRow, window).getValue();
+    Object rightValue = operands.get(1).evaluate(inputRow, window).getValue();
     switch (operands.get(0).getOutputType()) {
     case BIGINT:
     case DECIMAL:

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
index 7177d96..9a9739e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -46,8 +47,8 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
-    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
+    Object leftValue = operands.get(0).evaluate(inputRow, window).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
index c74fcd9..6034344 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -46,8 +47,8 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
-    Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
+    Object leftValue = operands.get(0).evaluate(inputRow, window).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
index 86abe43..336772d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Date;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -38,7 +39,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
     return getOperands().size() == 0;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
index d8de464..fe3feb8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.TimeZone;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -44,7 +45,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
     ret.setTime(new Date());
     return BeamSqlPrimitive.of(outputType, ret);

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
index 4736571..ca4b3ce 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -42,7 +43,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
     return opCount <= 1;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     return BeamSqlPrimitive.of(outputType, new Date());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
index 55767fa..0e1d3db 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
@@ -41,8 +42,8 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    Date date = opValueEvaluated(0, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    Date date = opValueEvaluated(0, inputRow, window);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
index 3310da5..2593629 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
@@ -41,8 +42,8 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.SYMBOL;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    Date date = opValueEvaluated(0, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    Date date = opValueEvaluated(0, inputRow, window);
     long time = date.getTime();
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
index 47cd879..38afd0a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
@@ -61,8 +62,8 @@ public class BeamSqlExtractExpression extends BeamSqlExpression {
         && opType(1) == SqlTypeName.BIGINT;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    Long time = opValueEvaluated(1, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    Long time = opValueEvaluated(1, inputRow, window);
 
     TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
index b8964d5..2cae22b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -32,10 +33,10 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
     boolean result = true;
     for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow, window);
       result = result && expOut.getValue();
       if (!result) {
         break;

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
index f9578b9..72a6982 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -42,10 +43,10 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
     return super.accept();
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    Boolean value = opValueEvaluated(0, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    Boolean value = opValueEvaluated(0, inputRow, window);
     if (value == null) {
-      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, window);
     } else {
       return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, !value);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
index 88a3916..74dde7a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -32,10 +33,10 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
     boolean result = false;
     for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow, window);
         result = result || expOut.getValue();
         if (result) {
           break;

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index 8f6c00c..ed0aac0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -38,10 +39,11 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
     return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow,
+      BoundedWindow window) {
     BeamSqlExpression leftOp = op(0);
     BeamSqlExpression rightOp = op(1);
-    return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
+    return calculate(leftOp.evaluate(inputRow, window), rightOp.evaluate(inputRow, window));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index b225b8e..b1a210e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -45,9 +46,10 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
     return acceptance;
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow,
+      BoundedWindow window) {
     BeamSqlExpression operand = op(0);
-    return calculate(operand.evaluate(inputRow));
+    return calculate(operand.evaluate(inputRow, window));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
index 676f859..3072ea0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
 
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -36,7 +37,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
index 0575978..00f2693 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -42,9 +43,9 @@ public class BeamSqlRandExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRecord, BoundedWindow window) {
     if (operands.size() == 1) {
-      int rowSeed = opValueEvaluated(0, inputRecord);
+      int rowSeed = opValueEvaluated(0, inputRecord, window);
       if (seed == null || seed != rowSeed) {
         rand.setSeed(rowSeed);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
index 52f0cc1..d055de6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -43,16 +44,16 @@ public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamRecord inputRecord, BoundedWindow window) {
     int numericIdx = 0;
     if (operands.size() == 2) {
-      int rowSeed = opValueEvaluated(0, inputRecord);
+      int rowSeed = opValueEvaluated(0, inputRecord, window);
       if (seed == null || seed != rowSeed) {
         rand.setSeed(rowSeed);
       }
       numericIdx = 1;
     }
     return BeamSqlPrimitive.of(SqlTypeName.INTEGER,
-        rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord)));
+        rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord, window)));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 974e2bc..5146b14 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -32,8 +33,8 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.INTEGER);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String str = opValueEvaluated(0, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String str = opValueEvaluated(0, inputRow, window);
     return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
index 14ef55d..c2f317f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -51,9 +52,9 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String left = opValueEvaluated(0, inputRow);
-    String right = opValueEvaluated(1, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String left = opValueEvaluated(0, inputRow, window);
+    String right = opValueEvaluated(1, inputRow, window);
 
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
         new StringBuilder(left.length() + right.length())

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
index e50872b..bf0b8f5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -32,8 +33,8 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String str = opValueEvaluated(0, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String str = opValueEvaluated(0, inputRow, window);
 
     StringBuilder ret = new StringBuilder(str);
     boolean isInit = true;

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
index 0f9a501..55f8d6d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -32,8 +33,8 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String str = opValueEvaluated(0, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String str = opValueEvaluated(0, inputRow, window);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
index 2336876..62d5a64 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -54,15 +55,15 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-    String replaceStr = opValueEvaluated(1, inputRow);
-    int idx = opValueEvaluated(2, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String str = opValueEvaluated(0, inputRow, window);
+    String replaceStr = opValueEvaluated(1, inputRow, window);
+    int idx = opValueEvaluated(2, inputRow, window);
     // the index is 1 based.
     idx -= 1;
     int length = replaceStr.length();
     if (operands.size() == 4) {
-      length = opValueEvaluated(3, inputRow);
+      length = opValueEvaluated(3, inputRow, window);
     }
 
     StringBuilder result = new StringBuilder(

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
index 06dce91..f97547e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -56,12 +57,12 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String targetStr = opValueEvaluated(0, inputRow);
-    String containingStr = opValueEvaluated(1, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String targetStr = opValueEvaluated(0, inputRow, window);
+    String containingStr = opValueEvaluated(1, inputRow, window);
     int from = -1;
     if (operands.size() == 3) {
-      Number tmp = opValueEvaluated(2, inputRow);
+      Number tmp = opValueEvaluated(2, inputRow, window);
       from = tmp.intValue();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
index f8582aa..a521ef0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -54,9 +55,9 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-    int idx = opValueEvaluated(1, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String str = opValueEvaluated(0, inputRow, window);
+    int idx = opValueEvaluated(1, inputRow, window);
     int startIdx = idx;
     if (startIdx > 0) {
       // NOTE: SQL substring is 1 based(rather than 0 based)
@@ -69,7 +70,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
     }
 
     if (operands.size() == 3) {
-      int length = opValueEvaluated(2, inputRow);
+      int length = opValueEvaluated(2, inputRow, window);
       if (length < 0) {
         length = 0;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
index 9c2a7ae..3c3083c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.fun.SqlTrimFunction;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -58,14 +59,14 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
     if (operands.size() == 1) {
       return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-          opValueEvaluated(0, inputRow).toString().trim());
+          opValueEvaluated(0, inputRow, window).toString().trim());
     } else {
-      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
-      String targetStr = opValueEvaluated(1, inputRow);
-      String containingStr = opValueEvaluated(2, inputRow);
+      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow, window);
+      String targetStr = opValueEvaluated(1, inputRow, window);
+      String containingStr = opValueEvaluated(2, inputRow, window);
 
       switch (type) {
         case LEADING:

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
index 94ac2e2..bc29ec8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -32,8 +33,8 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
-    String str = opValueEvaluated(0, inputRow);
+  @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+    String str = opValueEvaluated(0, inputRow, window);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index dab79a2..ce5444f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.KV;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -75,7 +76,6 @@ public class BeamAggregationTransforms implements Serializable{
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
       BeamRecord outRecord = new BeamRecord(outRowType);
-      outRecord.updateWindowRange(c.element().getKey(), window);
 
       KV<BeamRecord, BeamRecord> kvRecord = c.element();
       for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
@@ -85,7 +85,7 @@ public class BeamAggregationTransforms implements Serializable{
         outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
       }
       if (windowStartFieldIdx != -1) {
-        outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate());
+        outRecord.addField(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
       }
 
       c.output(outRecord);
@@ -112,7 +112,6 @@ public class BeamAggregationTransforms implements Serializable{
     public BeamRecord apply(BeamRecord input) {
       BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
       BeamRecord keyOfRecord = new BeamRecord(typeOfKey);
-      keyOfRecord.updateWindowRange(input, null);
 
       for (int idx = 0; idx < groupByKeys.size(); ++idx) {
         keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
@@ -223,7 +222,7 @@ public class BeamAggregationTransforms implements Serializable{
       for (int idx = 0; idx < aggregators.size(); ++idx) {
         deltaAcc.accumulatorElements.add(
             aggregators.get(idx).add(accumulator.accumulatorElements.get(idx),
-            sourceFieldExps.get(idx).evaluate(input).getValue()));
+            sourceFieldExps.get(idx).evaluate(input, null).getValue()));
       }
       return deltaAcc;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
index 31efeb7..d3a3f7b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;
 
 /**
@@ -44,10 +45,10 @@ public class BeamSqlFilterFn extends DoFn<BeamRecord, BeamRecord> {
   }
 
   @ProcessElement
-  public void processElement(ProcessContext c) {
+  public void processElement(ProcessContext c, BoundedWindow window) {
     BeamRecord in = c.element();
 
-    List<Object> result = executor.execute(in);
+    List<Object> result = executor.execute(in, window);
 
     if ((Boolean) result.get(0)) {
       c.output(in);


[2/4] beam git commit: Remove redundant windowing information from the BeamRecord itself element window information `BoundedWindow` is added in `BeamSqlExpression`.

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index a95c743..45dc621 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -52,10 +52,9 @@ public class BeamSqlProjectFn extends DoFn<BeamRecord, BeamRecord> {
   @ProcessElement
   public void processElement(ProcessContext c, BoundedWindow window) {
     BeamRecord inputRow = c.element();
-    List<Object> results = executor.execute(inputRow);
+    List<Object> results = executor.execute(inputRow, window);
 
     BeamRecord outRow = new BeamRecord(outputRowType);
-    outRow.updateWindowRange(inputRow, window);
 
     for (int idx = 0; idx < results.size(); ++idx) {
       BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 8501157..71278ec 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
 import org.junit.Test;
 
 /**
@@ -224,15 +223,11 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
     BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
 
     PAssert.that(result).containsInAnyOrder(record1, record2);
 
@@ -271,29 +266,21 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
 
     BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 3L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
 
     BeamRecord record3 = new BeamRecord(resultType);
     record3.addField("f_int2", 0);
     record3.addField("size", 1L);
     record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
-    record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
-    record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
 
     BeamRecord record4 = new BeamRecord(resultType);
     record4.addField("f_int2", 0);
     record4.addField("size", 1L);
     record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
-    record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-    record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
 
     PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
 
@@ -333,15 +320,11 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     record1.addField("f_int2", 0);
     record1.addField("size", 3L);
     record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
 
     BeamRecord record2 = new BeamRecord(resultType);
     record2.addField("f_int2", 0);
     record2.addField("size", 1L);
     record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime()));
 
     PAssert.that(result).containsInAnyOrder(record1, record2);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
index 5278871..1bcda2c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamNullExperssionTest.java
@@ -34,22 +34,22 @@ public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
   public void testIsNull() {
     BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
         new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
         BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 
   @Test
   public void testIsNotNull() {
     BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
         new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
         BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
index f6e33b5..51a170d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -37,11 +37,11 @@ public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
 
-    Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+    Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record, null).getValue());
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
 
-    Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue());
+    Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test
@@ -50,11 +50,11 @@ public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
 
-    Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+    Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record, null).getValue());
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
 
-    Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue());
+    Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record, null).getValue());
 
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
index 068f041..e02554f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpressionTest.java
@@ -72,14 +72,14 @@ public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
     assertEquals("hello", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
     assertEquals("world", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
@@ -88,6 +88,6 @@ public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello1"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "world"));
     assertEquals("hello1", new BeamSqlCaseExpression(operands)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
index 0c0aaa5..f4e3cf9 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -52,14 +52,14 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
   public void testForIntegerToBigintTypeCasting() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
     Assert.assertEquals(5L,
-        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record, null).getLong());
   }
 
   @Test
   public void testForDoubleToBigIntCasting() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
     Assert.assertEquals(5L,
-        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record, null).getLong());
   }
 
   @Test
@@ -67,7 +67,7 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for yyyyMMdd format
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
     Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record, null).getValue());
   }
 
   @Test
@@ -75,7 +75,7 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
     //test for yyyy-MM-dd format
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
     Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record, null).getValue());
   }
 
   @Test
@@ -83,14 +83,14 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for yy.MM.dd format
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
     Assert.assertEquals(Date.valueOf("2017-05-21"),
-        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record, null).getValue());
   }
 
   @Test
   public void testForTimestampCastExpression() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
     Assert.assertEquals(SqlTypeName.TIMESTAMP,
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record, null)
             .getOutputType());
   }
 
@@ -98,28 +98,32 @@ public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
   public void testDateTimeFormatWithMillis() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
     Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
   @Test
   public void testDateTimeFormatWithTimezone() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
     Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
   @Test
   public void testDateTimeFormat() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
     Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
   @Test(expected = RuntimeException.class)
   public void testForCastTypeNotSupported() {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
     Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
-        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP)
+          .evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
index ae3a12f..8aad6b3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCompareExpressionTest.java
@@ -40,12 +40,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlEqualsExpression exp1 = new BeamSqlEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 100L)));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlEqualsExpression exp2 = new BeamSqlEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -53,12 +53,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlGreaterThanExpression exp1 = new BeamSqlGreaterThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlGreaterThanExpression exp2 = new BeamSqlGreaterThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234566L)));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -66,12 +66,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlGreaterThanOrEqualsExpression exp1 = new BeamSqlGreaterThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlGreaterThanOrEqualsExpression exp2 = new BeamSqlGreaterThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234568L)));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -79,12 +79,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlLessThanExpression exp1 = new BeamSqlLessThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
             BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlLessThanExpression exp2 = new BeamSqlLessThanExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
             BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -92,12 +92,12 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlLessThanOrEqualsExpression exp1 = new BeamSqlLessThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
             BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.9)));
-    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+    Assert.assertEquals(true, exp1.evaluate(record, null).getValue());
 
     BeamSqlLessThanOrEqualsExpression exp2 = new BeamSqlLessThanOrEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
             BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.0)));
-    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+    Assert.assertEquals(false, exp2.evaluate(record, null).getValue());
   }
 
   @Test
@@ -105,11 +105,11 @@ public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlNotEqualsExpression exp1 = new BeamSqlNotEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
-    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+    Assert.assertEquals(false, exp1.evaluate(record, null).getValue());
 
     BeamSqlNotEqualsExpression exp2 = new BeamSqlNotEqualsExpression(
         Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
             BeamSqlPrimitive.of(SqlTypeName.BIGINT, 0L)));
-    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+    Assert.assertEquals(true, exp2.evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
index c78f9c0..e543d4f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
@@ -30,28 +30,28 @@ public class BeamSqlInputRefExpressionTest extends BeamSqlFnExecutorTestBase {
   @Test
   public void testRefInRange() {
     BeamSqlInputRefExpression ref0 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0);
-    Assert.assertEquals(record.getLong(0), ref0.evaluate(record).getValue());
+    Assert.assertEquals(record.getLong(0), ref0.evaluate(record, null).getValue());
 
     BeamSqlInputRefExpression ref1 = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1);
-    Assert.assertEquals(record.getInteger(1), ref1.evaluate(record).getValue());
+    Assert.assertEquals(record.getInteger(1), ref1.evaluate(record, null).getValue());
 
     BeamSqlInputRefExpression ref2 = new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2);
-    Assert.assertEquals(record.getDouble(2), ref2.evaluate(record).getValue());
+    Assert.assertEquals(record.getDouble(2), ref2.evaluate(record, null).getValue());
 
     BeamSqlInputRefExpression ref3 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3);
-    Assert.assertEquals(record.getLong(3), ref3.evaluate(record).getValue());
+    Assert.assertEquals(record.getLong(3), ref3.evaluate(record, null).getValue());
   }
 
 
   @Test(expected = IndexOutOfBoundsException.class)
   public void testRefOutOfRange(){
     BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 4);
-    ref.evaluate(record).getValue();
+    ref.evaluate(record, null).getValue();
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testTypeUnMatch(){
     BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 0);
-    ref.evaluate(record).getValue();
+    ref.evaluate(record, null).getValue();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
index c4e3d3f..81f9ce0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitiveTest.java
@@ -31,28 +31,28 @@ public class BeamSqlPrimitiveTest extends BeamSqlFnExecutorTestBase {
   @Test
   public void testPrimitiveInt(){
     BeamSqlPrimitive<Integer> expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch1(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch2(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch3(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.FLOAT, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
   @Test(expected = IllegalArgumentException.class)
   public void testPrimitiveTypeUnMatch4(){
     BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 100L);
-    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record, null).getValue());
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
index 2e01737..e614fdf 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpressionTest.java
@@ -69,7 +69,7 @@ public class BeamSqlReinterpretExpressionTest extends BeamSqlFnExecutorTestBase
     d.setTime(1000);
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, d));
     assertEquals(1000L, new BeamSqlReinterpretExpression(operands, SqlTypeName.BIGINT)
-        .evaluate(record).getValue());
+        .evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
index c4732f5..19098a6 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpressionTest.java
@@ -37,7 +37,7 @@ public class BeamSqlUdfExpressionTest extends BeamSqlFnExecutorTestBase {
     BeamSqlUdfExpression exp = new BeamSqlUdfExpression(
         UdfFn.class.getMethod("negative", Integer.class), operands, SqlTypeName.INTEGER);
 
-    Assert.assertEquals(-10, exp.evaluate(record).getValue());
+    Assert.assertEquals(-10, exp.evaluate(record, null).getValue());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
index 44001f9..a8d5e43 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -84,32 +84,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // float + long => float
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(Float.valueOf(1.1F + 1),
-        new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+        new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2.1, new BeamSqlPlusExpression(operands).evaluate(record).getValue());
+    assertEquals(2.1, new BeamSqlPlusExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testMinus() {
@@ -118,32 +118,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => long
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
 
     // float + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(2.1F - 1L,
-        new BeamSqlMinusExpression(operands).evaluate(record).getValue().floatValue(), 0.1);
+        new BeamSqlMinusExpression(operands).evaluate(record, null).getValue().floatValue(), 0.1);
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(1.1, new BeamSqlMinusExpression(operands).evaluate(record).getValue());
+    assertEquals(1.1, new BeamSqlMinusExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testMultiply() {
@@ -152,32 +152,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // float + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(Float.valueOf(2.1F * 1L),
-        new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+        new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2.1, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue());
+    assertEquals(2.1, new BeamSqlMultiplyExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testDivide() {
@@ -186,32 +186,32 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => integer
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // float + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
     assertEquals(2.1F / 1,
-        new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+        new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
 
     // double + long => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    assertEquals(2.1, new BeamSqlDivideExpression(operands).evaluate(record).getValue());
+    assertEquals(2.1, new BeamSqlDivideExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testMod() {
@@ -220,18 +220,18 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase {
     // integer + integer => long
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    assertEquals(1, new BeamSqlModExpression(operands).evaluate(record).getValue());
+    assertEquals(1, new BeamSqlModExpression(operands).evaluate(record, null).getValue());
 
     // integer + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record, null).getValue());
 
     // long + long => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue());
+    assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
index cd390c4..bfca720 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
@@ -32,7 +32,7 @@ public class BeamSqlCurrentDateExpressionTest extends BeamSqlDateExpressionTestB
     Assert.assertEquals(
         SqlTypeName.DATE,
         new BeamSqlCurrentDateExpression()
-            .evaluate(BeamSqlFnExecutorTestBase.record).getOutputType()
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getOutputType()
     );
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
index 416df01..af3cacd 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
@@ -34,6 +34,6 @@ public class BeamSqlCurrentTimeExpressionTest extends BeamSqlDateExpressionTestB
   public void test() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     assertEquals(SqlTypeName.TIME,
-        new BeamSqlCurrentTimeExpression(operands).evaluate(record).getOutputType());
+        new BeamSqlCurrentTimeExpression(operands).evaluate(record, null).getOutputType());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
index d44b6c1..c171e40 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpressionTest.java
@@ -34,6 +34,6 @@ public class BeamSqlCurrentTimestampExpressionTest extends BeamSqlDateExpression
   public void test() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     assertEquals(SqlTypeName.TIMESTAMP,
-        new BeamSqlCurrentTimestampExpression(operands).evaluate(record).getOutputType());
+        new BeamSqlCurrentTimestampExpression(operands).evaluate(record, null).getOutputType());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
index 5bc99e8..141bbf5 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
@@ -40,11 +40,11 @@ public class BeamSqlDateCeilExpressionTest extends BeamSqlDateExpressionTestBase
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
     Assert.assertEquals(str2DateTime("2018-01-01 00:00:00"),
         new BeamSqlDateCeilExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getDate());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getDate());
 
     operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
     Assert.assertEquals(str2DateTime("2017-06-01 00:00:00"),
         new BeamSqlDateCeilExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getDate());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getDate());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
index ecab54b..ede12ce 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
@@ -39,11 +39,11 @@ public class BeamSqlDateFloorExpressionTest extends BeamSqlDateExpressionTestBas
     // YEAR
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
     assertEquals(str2DateTime("2017-01-01 00:00:00"),
-        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+        new BeamSqlDateFloorExpression(operands).evaluate(record, null).getDate());
     // MONTH
     operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
     assertEquals(str2DateTime("2017-05-01 00:00:00"),
-        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+        new BeamSqlDateFloorExpression(operands).evaluate(record, null).getDate());
 
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
index 0ca7e3e..b03827a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpressionTest.java
@@ -43,7 +43,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(2017L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // MONTH
     operands.clear();
@@ -52,7 +52,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(5L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // DAY
     operands.clear();
@@ -61,7 +61,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(22L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // DAY_OF_WEEK
     operands.clear();
@@ -70,7 +70,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(2L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // DAY_OF_YEAR
     operands.clear();
@@ -79,7 +79,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(142L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // WEEK
     operands.clear();
@@ -88,7 +88,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(21L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
     // QUARTER
     operands.clear();
@@ -97,7 +97,7 @@ public class BeamSqlExtractExpressionTest extends BeamSqlDateExpressionTestBase
         time));
     assertEquals(2L,
         new BeamSqlExtractExpression(operands)
-            .evaluate(BeamSqlFnExecutorTestBase.record).getValue());
+            .evaluate(BeamSqlFnExecutorTestBase.record, null).getValue());
 
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
index a437db7..c98ce23 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpressionTest.java
@@ -34,14 +34,14 @@ public class BeamSqlNotExpressionTest extends BeamSqlFnExecutorTestBase {
   @Test public void evaluate() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
-    Assert.assertTrue(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+    Assert.assertTrue(new BeamSqlNotExpression(operands).evaluate(record, null).getBoolean());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
-    Assert.assertFalse(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+    Assert.assertFalse(new BeamSqlNotExpression(operands).evaluate(record, null).getBoolean());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null));
-    Assert.assertNull(new BeamSqlNotExpression(operands).evaluate(record).getValue());
+    Assert.assertNull(new BeamSqlNotExpression(operands).evaluate(record, null).getValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
index d42164e..6665253 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpressionTest.java
@@ -67,60 +67,66 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // round(double, double) => double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
-    Assert.assertEquals(2.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
     // round(integer,integer) => integer
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(long,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 5L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
-    Assert.assertEquals(5L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(5L, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(short) => short
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, new Short("4")));
     Assert.assertEquals(SqlFunctions.toShort(4),
-        new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(long,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    Assert.assertEquals(2L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2L, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(double, long) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    Assert.assertEquals(1.1, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.1,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.368768));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    Assert.assertEquals(2.37, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2.37,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 3.78683686458));
-    Assert.assertEquals(4.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(4.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 378.683686458));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -2));
-    Assert.assertEquals(400.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(400.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 378.683686458));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1));
-    Assert.assertEquals(380.0, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(380.0,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // round(integer, double) => integer
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.2));
-    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2, new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
 
     // operand with a BeamSqlInputRefExpression
     // to select a column value from row of a record
@@ -129,7 +135,8 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(ref0);
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
 
-    Assert.assertEquals(1234567L, new BeamSqlRoundExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1234567L,
+        new BeamSqlRoundExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testPowerFunction() {
@@ -139,55 +146,62 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
-    Assert.assertEquals(16.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(16.0,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
     // power(integer,integer) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
-    Assert.assertEquals(4.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(4.0,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
     // power(integer,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L));
-    Assert.assertEquals(8.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(8.0
+        , new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(long,long) => long
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L));
-    Assert.assertEquals(4.0, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(4.0,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(double, int) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
-    Assert.assertEquals(1.1, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.1,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(double, long) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 1.1));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L));
-    Assert.assertEquals(1.1, new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.1,
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
 
     // power(integer, double) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.2));
     Assert.assertEquals(Math.pow(2, 2.2),
-        new BeamSqlPowerExpression(operands).evaluate(record).getValue());
+        new BeamSqlPowerExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForTruncate() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.0));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 4.0));
-    Assert.assertEquals(2.0, new BeamSqlTruncateExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(2.0,
+        new BeamSqlTruncateExpression(operands).evaluate(record, null).getValue());
     // truncate(double, integer) => double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.80685));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
     Assert.assertEquals(2.8068,
-        new BeamSqlTruncateExpression(operands).evaluate(record).getValue());
+        new BeamSqlTruncateExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAtan2() {
@@ -195,7 +209,7 @@ public class BeamSqlMathBinaryExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.875));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.56));
     Assert.assertEquals(Math.atan2(0.875, 0.56),
-        new BeamSqlAtan2Expression(operands).evaluate(record).getValue());
+        new BeamSqlAtan2Expression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
index 3f3326b..d80a670 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
@@ -59,8 +59,8 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for abs function
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, -28965734597L));
-    Assert
-        .assertEquals(28965734597L, new BeamSqlAbsExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(28965734597L,
+        new BeamSqlAbsExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForLnExpression() {
@@ -68,18 +68,20 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for LN function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert.assertEquals(Math.log(2), new BeamSqlLnExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.log(2),
+        new BeamSqlLnExpression(operands).evaluate(record, null).getValue());
 
     // test for LN function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert
-        .assertEquals(Math.log(2.4), new BeamSqlLnExpression(operands).evaluate(record).getValue());
+        .assertEquals(Math.log(2.4),
+            new BeamSqlLnExpression(operands).evaluate(record, null).getValue());
     // test for LN function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.log(2.56),
-        new BeamSqlLnExpression(operands).evaluate(record).getValue());
+        new BeamSqlLnExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForLog10Expression() {
@@ -88,17 +90,17 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for log10 function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
     Assert.assertEquals(Math.log10(2),
-        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+        new BeamSqlLogExpression(operands).evaluate(record, null).getValue());
     // test for log10 function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.log10(2.4),
-        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+        new BeamSqlLogExpression(operands).evaluate(record, null).getValue());
     // test for log10 function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.log10(2.56),
-        new BeamSqlLogExpression(operands).evaluate(record).getValue());
+        new BeamSqlLogExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForExpExpression() {
@@ -106,18 +108,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Math.exp(2), new BeamSqlExpExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.exp(2),
+        new BeamSqlExpExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.exp(2.4),
-        new BeamSqlExpExpression(operands).evaluate(record).getValue());
+        new BeamSqlExpExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.exp(2.56),
-        new BeamSqlExpExpression(operands).evaluate(record).getValue());
+        new BeamSqlExpExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAcosExpression() {
@@ -125,18 +127,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Double.NaN, new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Double.NaN,
+        new BeamSqlAcosExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.acos(0.45),
-        new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+        new BeamSqlAcosExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.acos(-0.367),
-        new BeamSqlAcosExpression(operands).evaluate(record).getValue());
+        new BeamSqlAcosExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAsinExpression() {
@@ -145,12 +147,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.asin(0.45),
-        new BeamSqlAsinExpression(operands).evaluate(record).getValue());
+        new BeamSqlAsinExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.asin(-0.367),
-        new BeamSqlAsinExpression(operands).evaluate(record).getValue());
+        new BeamSqlAsinExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForAtanExpression() {
@@ -159,12 +161,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.atan(0.45),
-        new BeamSqlAtanExpression(operands).evaluate(record).getValue());
+        new BeamSqlAtanExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.atan(-0.367),
-        new BeamSqlAtanExpression(operands).evaluate(record).getValue());
+        new BeamSqlAtanExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForCosExpression() {
@@ -173,12 +175,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 0.45));
     Assert.assertEquals(Math.cos(0.45),
-        new BeamSqlCosExpression(operands).evaluate(record).getValue());
+        new BeamSqlCosExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-0.367)));
     Assert.assertEquals(Math.cos(-0.367),
-        new BeamSqlCosExpression(operands).evaluate(record).getValue());
+        new BeamSqlCosExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForCotExpression() {
@@ -187,12 +189,12 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type double
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, .45));
     Assert.assertEquals(1.0d / Math.tan(0.45),
-        new BeamSqlCotExpression(operands).evaluate(record).getValue());
+        new BeamSqlCotExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(-.367)));
     Assert.assertEquals(1.0d / Math.tan(-0.367),
-        new BeamSqlCotExpression(operands).evaluate(record).getValue());
+        new BeamSqlCotExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForDegreesExpression() {
@@ -201,17 +203,17 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
     Assert.assertEquals(Math.toDegrees(2),
-        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+        new BeamSqlDegreesExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.toDegrees(2.4),
-        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+        new BeamSqlDegreesExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.toDegrees(2.56),
-        new BeamSqlDegreesExpression(operands).evaluate(record).getValue());
+        new BeamSqlDegreesExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForRadiansExpression() {
@@ -220,17 +222,17 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
     Assert.assertEquals(Math.toRadians(2),
-        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+        new BeamSqlRadiansExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.toRadians(2.4),
-        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+        new BeamSqlRadiansExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.toRadians(2.56),
-        new BeamSqlRadiansExpression(operands).evaluate(record).getValue());
+        new BeamSqlRadiansExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForSinExpression() {
@@ -238,18 +240,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Math.sin(2), new BeamSqlSinExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.sin(2),
+        new BeamSqlSinExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.sin(2.4),
-        new BeamSqlSinExpression(operands).evaluate(record).getValue());
+        new BeamSqlSinExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.sin(2.56),
-        new BeamSqlSinExpression(operands).evaluate(record).getValue());
+        new BeamSqlSinExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForTanExpression() {
@@ -257,18 +259,18 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert
-        .assertEquals(Math.tan(2), new BeamSqlTanExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(Math.tan(2),
+        new BeamSqlTanExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
     Assert.assertEquals(Math.tan(2.4),
-        new BeamSqlTanExpression(operands).evaluate(record).getValue());
+        new BeamSqlTanExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(Math.tan(2.56),
-        new BeamSqlTanExpression(operands).evaluate(record).getValue());
+        new BeamSqlTanExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForSignExpression() {
@@ -276,34 +278,35 @@ public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
     // test for exp function with operand type smallint
     operands.add(BeamSqlPrimitive.of(SqlTypeName.SMALLINT, Short.valueOf("2")));
-    Assert.assertEquals((short) 1, new BeamSqlSignExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals((short) 1
+        , new BeamSqlSignExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type double
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.4));
-    Assert.assertEquals(1.0, new BeamSqlSignExpression(operands).evaluate(record).getValue());
+    Assert.assertEquals(1.0, new BeamSqlSignExpression(operands).evaluate(record, null).getValue());
     // test for exp function with operand type decimal
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DECIMAL, BigDecimal.valueOf(2.56)));
     Assert.assertEquals(BigDecimal.ONE,
-        new BeamSqlSignExpression(operands).evaluate(record).getValue());
+        new BeamSqlSignExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForPi() {
-    Assert.assertEquals(Math.PI, new BeamSqlPiExpression().evaluate(record).getValue());
+    Assert.assertEquals(Math.PI, new BeamSqlPiExpression().evaluate(record, null).getValue());
   }
 
   @Test public void testForCeil() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.68687979));
     Assert.assertEquals(Math.ceil(2.68687979),
-        new BeamSqlCeilExpression(operands).evaluate(record).getValue());
+        new BeamSqlCeilExpression(operands).evaluate(record, null).getValue());
   }
 
   @Test public void testForFloor() {
     List<BeamSqlExpression> operands = new ArrayList<>();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 2.68687979));
     Assert.assertEquals(Math.floor(2.68687979),
-        new BeamSqlFloorExpression(operands).evaluate(record).getValue());
+        new BeamSqlFloorExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
index 118097f..d6c3565 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
@@ -38,7 +38,7 @@ public class BeamSqlCharLengthExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     assertEquals(5,
-        new BeamSqlCharLengthExpression(operands).evaluate(record).getValue());
+        new BeamSqlCharLengthExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
index c3f8041..c350fe2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpressionTest.java
@@ -60,7 +60,7 @@ public class BeamSqlConcatExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, " world"));
     Assert.assertEquals("hello world",
-        new BeamSqlConcatExpression(operands).evaluate(record).getValue());
+        new BeamSqlConcatExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
index 24f9945..7ea83d1 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
@@ -38,17 +38,17 @@ public class BeamSqlInitCapExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello world"));
     assertEquals("Hello World",
-        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+        new BeamSqlInitCapExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hEllO wOrld"));
     assertEquals("Hello World",
-        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+        new BeamSqlInitCapExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello     world"));
     assertEquals("Hello     World",
-        new BeamSqlInitCapExpression(operands).evaluate(record).getValue());
+        new BeamSqlInitCapExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
index e34fcc0..393680c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpressionTest.java
@@ -38,7 +38,7 @@ public class BeamSqlLowerExpressionTest extends BeamSqlFnExecutorTestBase {
 
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "HELLO"));
     assertEquals("hello",
-        new BeamSqlLowerExpression(operands).evaluate(record).getValue());
+        new BeamSqlLowerExpression(operands).evaluate(record, null).getValue());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
index 09bbdc8..2b4c0ea 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
@@ -57,7 +57,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "resou"));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
@@ -65,7 +65,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4));
     Assert.assertEquals("w3resou33rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
@@ -73,7 +73,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
     Assert.assertEquals("w3resou3rce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
 
     operands.clear();
     operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "w3333333rce"));
@@ -81,7 +81,7 @@ public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3));
     operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 7));
     Assert.assertEquals("w3resouce",
-        new BeamSqlOverlayExpression(operands).evaluate(record).getValue());
+        new BeamSqlOverlayExpression(operands).evaluate(record, null).getValue());
   }
 
 }


[4/4] beam git commit: [BEAM-2722] This closes #3689

Posted by ta...@apache.org.
[BEAM-2722] This closes #3689


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79880b6a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79880b6a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79880b6a

Branch: refs/heads/DSL_SQL
Commit: 79880b6ab71936b6681c581128db87cc69ab6395
Parents: 8f922f7 c0b1fed
Author: Tyler Akidau <ta...@apache.org>
Authored: Mon Aug 7 14:05:04 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Mon Aug 7 14:05:04 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java |  7 --
 .../org/apache/beam/sdk/values/BeamRecord.java  | 36 +-------
 .../interpreter/BeamSqlExpressionExecutor.java  |  5 +-
 .../sql/impl/interpreter/BeamSqlFnExecutor.java |  5 +-
 .../operator/BeamSqlCaseExpression.java         |  9 +-
 .../operator/BeamSqlCastExpression.java         | 31 +++----
 .../interpreter/operator/BeamSqlExpression.java |  9 +-
 .../operator/BeamSqlInputRefExpression.java     |  3 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |  5 +-
 .../operator/BeamSqlReinterpretExpression.java  |  7 +-
 .../operator/BeamSqlUdfExpression.java          |  5 +-
 .../operator/BeamSqlWindowEndExpression.java    | 12 ++-
 .../operator/BeamSqlWindowExpression.java       |  5 +-
 .../operator/BeamSqlWindowStartExpression.java  | 12 ++-
 .../arithmetic/BeamSqlArithmeticExpression.java |  8 +-
 .../comparison/BeamSqlCompareExpression.java    |  7 +-
 .../comparison/BeamSqlIsNotNullExpression.java  |  5 +-
 .../comparison/BeamSqlIsNullExpression.java     |  5 +-
 .../date/BeamSqlCurrentDateExpression.java      |  3 +-
 .../date/BeamSqlCurrentTimeExpression.java      |  3 +-
 .../date/BeamSqlCurrentTimestampExpression.java |  3 +-
 .../date/BeamSqlDateCeilExpression.java         |  5 +-
 .../date/BeamSqlDateFloorExpression.java        |  5 +-
 .../operator/date/BeamSqlExtractExpression.java |  5 +-
 .../operator/logical/BeamSqlAndExpression.java  |  5 +-
 .../operator/logical/BeamSqlNotExpression.java  |  7 +-
 .../operator/logical/BeamSqlOrExpression.java   |  5 +-
 .../math/BeamSqlMathBinaryExpression.java       |  6 +-
 .../math/BeamSqlMathUnaryExpression.java        |  6 +-
 .../operator/math/BeamSqlPiExpression.java      |  3 +-
 .../operator/math/BeamSqlRandExpression.java    |  5 +-
 .../math/BeamSqlRandIntegerExpression.java      |  7 +-
 .../string/BeamSqlCharLengthExpression.java     |  5 +-
 .../string/BeamSqlConcatExpression.java         |  7 +-
 .../string/BeamSqlInitCapExpression.java        |  5 +-
 .../operator/string/BeamSqlLowerExpression.java |  5 +-
 .../string/BeamSqlOverlayExpression.java        | 11 +--
 .../string/BeamSqlPositionExpression.java       |  9 +-
 .../string/BeamSqlSubstringExpression.java      |  9 +-
 .../operator/string/BeamSqlTrimExpression.java  | 11 +--
 .../operator/string/BeamSqlUpperExpression.java |  5 +-
 .../transform/BeamAggregationTransforms.java    |  7 +-
 .../sql/impl/transform/BeamSqlFilterFn.java     |  5 +-
 .../sql/impl/transform/BeamSqlProjectFn.java    |  3 +-
 .../sql/BeamSqlDslAggregationTest.java          | 17 ----
 .../operator/BeamNullExperssionTest.java        |  8 +-
 .../operator/BeamSqlAndOrExpressionTest.java    |  8 +-
 .../operator/BeamSqlCaseExpressionTest.java     |  6 +-
 .../operator/BeamSqlCastExpressionTest.java     | 24 +++---
 .../operator/BeamSqlCompareExpressionTest.java  | 24 +++---
 .../operator/BeamSqlInputRefExpressionTest.java | 12 +--
 .../operator/BeamSqlPrimitiveTest.java          | 10 +--
 .../BeamSqlReinterpretExpressionTest.java       |  2 +-
 .../operator/BeamSqlUdfExpressionTest.java      |  2 +-
 .../BeamSqlArithmeticExpressionTest.java        | 46 +++++-----
 .../date/BeamSqlCurrentDateExpressionTest.java  |  2 +-
 .../date/BeamSqlCurrentTimeExpressionTest.java  |  2 +-
 .../BeamSqlCurrentTimestampExpressionTest.java  |  2 +-
 .../date/BeamSqlDateCeilExpressionTest.java     |  4 +-
 .../date/BeamSqlDateFloorExpressionTest.java    |  4 +-
 .../date/BeamSqlExtractExpressionTest.java      | 14 +--
 .../logical/BeamSqlNotExpressionTest.java       |  6 +-
 .../math/BeamSqlMathBinaryExpressionTest.java   | 58 ++++++++-----
 .../math/BeamSqlMathUnaryExpressionTest.java    | 91 ++++++++++----------
 .../string/BeamSqlCharLengthExpressionTest.java |  2 +-
 .../string/BeamSqlConcatExpressionTest.java     |  2 +-
 .../string/BeamSqlInitCapExpressionTest.java    |  6 +-
 .../string/BeamSqlLowerExpressionTest.java      |  2 +-
 .../string/BeamSqlOverlayExpressionTest.java    |  8 +-
 .../string/BeamSqlPositionExpressionTest.java   |  6 +-
 .../string/BeamSqlSubstringExpressionTest.java  | 14 +--
 .../string/BeamSqlTrimExpressionTest.java       |  8 +-
 .../string/BeamSqlUpperExpressionTest.java      |  2 +-
 73 files changed, 366 insertions(+), 352 deletions(-)
----------------------------------------------------------------------