You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/07 21:06:10 UTC
[3/4] beam git commit: Remove redundant windowing information from
the BeamRecord itself element window information `BoundedWindow` is added in
`BeamSqlExpression`.
Remove redundant windowing information from the BeamRecord itself
element window information `BoundedWindow` is added in `BeamSqlExpression`.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0b1fed1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0b1fed1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0b1fed1
Branch: refs/heads/DSL_SQL
Commit: c0b1fed1c7687a1e44e8ee6997be40aa1785ea51
Parents: 8f922f7
Author: mingmxu <mi...@ebay.com>
Authored: Fri Aug 4 11:12:45 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Fri Aug 4 11:41:56 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BeamRecordCoder.java | 7 --
.../org/apache/beam/sdk/values/BeamRecord.java | 36 +-------
.../interpreter/BeamSqlExpressionExecutor.java | 5 +-
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 5 +-
.../operator/BeamSqlCaseExpression.java | 9 +-
.../operator/BeamSqlCastExpression.java | 31 +++----
.../interpreter/operator/BeamSqlExpression.java | 9 +-
.../operator/BeamSqlInputRefExpression.java | 3 +-
.../interpreter/operator/BeamSqlPrimitive.java | 5 +-
.../operator/BeamSqlReinterpretExpression.java | 7 +-
.../operator/BeamSqlUdfExpression.java | 5 +-
.../operator/BeamSqlWindowEndExpression.java | 12 ++-
.../operator/BeamSqlWindowExpression.java | 5 +-
.../operator/BeamSqlWindowStartExpression.java | 12 ++-
.../arithmetic/BeamSqlArithmeticExpression.java | 8 +-
.../comparison/BeamSqlCompareExpression.java | 7 +-
.../comparison/BeamSqlIsNotNullExpression.java | 5 +-
.../comparison/BeamSqlIsNullExpression.java | 5 +-
.../date/BeamSqlCurrentDateExpression.java | 3 +-
.../date/BeamSqlCurrentTimeExpression.java | 3 +-
.../date/BeamSqlCurrentTimestampExpression.java | 3 +-
.../date/BeamSqlDateCeilExpression.java | 5 +-
.../date/BeamSqlDateFloorExpression.java | 5 +-
.../operator/date/BeamSqlExtractExpression.java | 5 +-
.../operator/logical/BeamSqlAndExpression.java | 5 +-
.../operator/logical/BeamSqlNotExpression.java | 7 +-
.../operator/logical/BeamSqlOrExpression.java | 5 +-
.../math/BeamSqlMathBinaryExpression.java | 6 +-
.../math/BeamSqlMathUnaryExpression.java | 6 +-
.../operator/math/BeamSqlPiExpression.java | 3 +-
.../operator/math/BeamSqlRandExpression.java | 5 +-
.../math/BeamSqlRandIntegerExpression.java | 7 +-
.../string/BeamSqlCharLengthExpression.java | 5 +-
.../string/BeamSqlConcatExpression.java | 7 +-
.../string/BeamSqlInitCapExpression.java | 5 +-
.../operator/string/BeamSqlLowerExpression.java | 5 +-
.../string/BeamSqlOverlayExpression.java | 11 +--
.../string/BeamSqlPositionExpression.java | 9 +-
.../string/BeamSqlSubstringExpression.java | 9 +-
.../operator/string/BeamSqlTrimExpression.java | 11 +--
.../operator/string/BeamSqlUpperExpression.java | 5 +-
.../transform/BeamAggregationTransforms.java | 7 +-
.../sql/impl/transform/BeamSqlFilterFn.java | 5 +-
.../sql/impl/transform/BeamSqlProjectFn.java | 3 +-
.../sql/BeamSqlDslAggregationTest.java | 17 ----
.../operator/BeamNullExperssionTest.java | 8 +-
.../operator/BeamSqlAndOrExpressionTest.java | 8 +-
.../operator/BeamSqlCaseExpressionTest.java | 6 +-
.../operator/BeamSqlCastExpressionTest.java | 24 +++---
.../operator/BeamSqlCompareExpressionTest.java | 24 +++---
.../operator/BeamSqlInputRefExpressionTest.java | 12 +--
.../operator/BeamSqlPrimitiveTest.java | 10 +--
.../BeamSqlReinterpretExpressionTest.java | 2 +-
.../operator/BeamSqlUdfExpressionTest.java | 2 +-
.../BeamSqlArithmeticExpressionTest.java | 46 +++++-----
.../date/BeamSqlCurrentDateExpressionTest.java | 2 +-
.../date/BeamSqlCurrentTimeExpressionTest.java | 2 +-
.../BeamSqlCurrentTimestampExpressionTest.java | 2 +-
.../date/BeamSqlDateCeilExpressionTest.java | 4 +-
.../date/BeamSqlDateFloorExpressionTest.java | 4 +-
.../date/BeamSqlExtractExpressionTest.java | 14 +--
.../logical/BeamSqlNotExpressionTest.java | 6 +-
.../math/BeamSqlMathBinaryExpressionTest.java | 58 ++++++++-----
.../math/BeamSqlMathUnaryExpressionTest.java | 91 ++++++++++----------
.../string/BeamSqlCharLengthExpressionTest.java | 2 +-
.../string/BeamSqlConcatExpressionTest.java | 2 +-
.../string/BeamSqlInitCapExpressionTest.java | 6 +-
.../string/BeamSqlLowerExpressionTest.java | 2 +-
.../string/BeamSqlOverlayExpressionTest.java | 8 +-
.../string/BeamSqlPositionExpressionTest.java | 6 +-
.../string/BeamSqlSubstringExpressionTest.java | 14 +--
.../string/BeamSqlTrimExpressionTest.java | 8 +-
.../string/BeamSqlUpperExpressionTest.java | 2 +-
73 files changed, 366 insertions(+), 352 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index 06958a4..fe9c295 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.BeamRecordType;
@Experimental
public class BeamRecordCoder extends CustomCoder<BeamRecord> {
private static final BitSetCoder nullListCoder = BitSetCoder.of();
- private static final InstantCoder instantCoder = InstantCoder.of();
private BeamRecordType recordType;
private List<Coder> coderArray;
@@ -64,9 +63,6 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
coderArray.get(idx).encode(value.getFieldValue(idx), outStream);
}
-
- instantCoder.encode(value.getWindowStart(), outStream);
- instantCoder.encode(value.getWindowEnd(), outStream);
}
@Override
@@ -82,9 +78,6 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
record.addField(idx, coderArray.get(idx).decode(inStream));
}
- record.setWindowStart(instantCoder.decode(inStream));
- record.setWindowEnd(instantCoder.decode(inStream));
-
return record;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index bac649e..8d0aa42 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -24,11 +24,7 @@ import java.util.BitSet;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.joda.time.Instant;
/**
* {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
@@ -42,9 +38,6 @@ public class BeamRecord implements Serializable {
private BitSet nullFields;
private BeamRecordType dataType;
- private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
- private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
-
public BeamRecord(BeamRecordType dataType) {
this.dataType = dataType;
this.nullFields = new BitSet(dataType.size());
@@ -62,17 +55,6 @@ public class BeamRecord implements Serializable {
}
}
- public void updateWindowRange(BeamRecord upstreamRecord, BoundedWindow window){
- windowStart = upstreamRecord.windowStart;
- windowEnd = upstreamRecord.windowEnd;
-
- if (window instanceof IntervalWindow) {
- IntervalWindow iWindow = (IntervalWindow) window;
- windowStart = iWindow.start();
- windowEnd = iWindow.end();
- }
- }
-
public void addField(String fieldName, Object fieldValue) {
addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
}
@@ -211,26 +193,10 @@ public class BeamRecord implements Serializable {
return nullFields.get(idx);
}
- public Instant getWindowStart() {
- return windowStart;
- }
-
- public Instant getWindowEnd() {
- return windowEnd;
- }
-
- public void setWindowStart(Instant windowStart) {
- this.windowStart = windowStart;
- }
-
- public void setWindowEnd(Instant windowEnd) {
- this.windowEnd = windowEnd;
- }
-
@Override
public String toString() {
return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
- + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
+ + dataType + "]";
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
index 3cd6d65..3aaf505 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
/**
@@ -34,10 +35,10 @@ public interface BeamSqlExpressionExecutor extends Serializable {
void prepare();
/**
- * apply transformation to input record {@link BeamRecord}.
+ * apply transformation to input record {@link BeamRecord} with {@link BoundedWindow}.
*
*/
- List<Object> execute(BeamRecord inputRow);
+ List<Object> execute(BeamRecord inputRow, BoundedWindow window);
void close();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 0f77ed8..8f9797b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -88,6 +88,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamS
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
@@ -427,10 +428,10 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
}
@Override
- public List<Object> execute(BeamRecord inputRow) {
+ public List<Object> execute(BeamRecord inputRow, BoundedWindow window) {
List<Object> results = new ArrayList<>();
for (BeamSqlExpression exp : exps) {
- results.add(exp.evaluate(inputRow).getValue());
+ results.add(exp.evaluate(inputRow, window).getValue());
}
return results;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
index af48cbe..955444f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
@@ -19,6 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -48,16 +49,16 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
for (int i = 0; i < operands.size() - 1; i += 2) {
- if (opValueEvaluated(i, inputRow)) {
+ if (opValueEvaluated(i, inputRow, window)) {
return BeamSqlPrimitive.of(
outputType,
- opValueEvaluated(i + 1, inputRow)
+ opValueEvaluated(i + 1, inputRow, window)
);
}
}
return BeamSqlPrimitive.of(outputType,
- opValueEvaluated(operands.size() - 1, inputRow));
+ opValueEvaluated(operands.size() - 1, inputRow, window));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
index 3786281..9ea66c1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -71,40 +72,40 @@ public class BeamSqlCastExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
SqlTypeName castOutputType = getOutputType();
switch (castOutputType) {
case INTEGER:
return BeamSqlPrimitive
- .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+ .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow, window)));
case DOUBLE:
- return BeamSqlPrimitive
- .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+ return BeamSqlPrimitive.of(SqlTypeName.DOUBLE,
+ SqlFunctions.toDouble(opValueEvaluated(index, inputRow, window)));
case SMALLINT:
- return BeamSqlPrimitive
- .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+ return BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
+ SqlFunctions.toShort(opValueEvaluated(index, inputRow, window)));
case TINYINT:
- return BeamSqlPrimitive
- .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+ return BeamSqlPrimitive.of(SqlTypeName.TINYINT,
+ SqlFunctions.toByte(opValueEvaluated(index, inputRow, window)));
case BIGINT:
return BeamSqlPrimitive
- .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+ .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow, window)));
case DECIMAL:
return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
- SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+ SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow, window)));
case FLOAT:
return BeamSqlPrimitive
- .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+ .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow, window)));
case CHAR:
case VARCHAR:
return BeamSqlPrimitive
- .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+ .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow, window).toString());
case DATE:
- return BeamSqlPrimitive
- .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+ return BeamSqlPrimitive.of(SqlTypeName.DATE,
+ toDate(opValueEvaluated(index, inputRow, window), outputDateFormat));
case TIMESTAMP:
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+ toTimeStamp(opValueEvaluated(index, inputRow, window), outputTimestampFormat));
}
throw new UnsupportedOperationException(
String.format("Cast to type %s not supported", castOutputType));
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
index f42a365..d18b141 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -49,8 +50,8 @@ public abstract class BeamSqlExpression implements Serializable {
return op(idx).getOutputType();
}
- public <T> T opValueEvaluated(int idx, BeamRecord row) {
- return (T) op(idx).evaluate(row).getValue();
+ public <T> T opValueEvaluated(int idx, BeamRecord row, BoundedWindow window) {
+ return (T) op(idx).evaluate(row, window).getValue();
}
/**
@@ -59,10 +60,10 @@ public abstract class BeamSqlExpression implements Serializable {
public abstract boolean accept();
/**
- * Apply input record {@link BeamRecord} to this expression,
+ * Apply input record {@link BeamRecord} with {@link BoundedWindow} to this expression,
* the output value is wrapped with {@link BeamSqlPrimitive}.
*/
- public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow);
+ public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window);
public List<BeamSqlExpression> getOperands() {
return operands;
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
index 8c3d4d4..a2d1624 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -37,7 +38,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index f763898..9175caa 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -21,13 +21,14 @@ import java.math.BigDecimal;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
/**
* {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
- * It holds the value, and return it directly during {@link #evaluate(BeamRecord)}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamRecord, BoundedWindow)}.
*
*/
public class BeamSqlPrimitive<T> extends BeamSqlExpression {
@@ -145,7 +146,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow) {
+ public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow, BoundedWindow window) {
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
index c1fa2c7..2ec4fb5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -41,13 +42,13 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression {
&& SqlTypeName.DATETIME_TYPES.contains(opType(0));
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
if (opType(0) == SqlTypeName.TIME) {
- GregorianCalendar date = opValueEvaluated(0, inputRow);
+ GregorianCalendar date = opValueEvaluated(0, inputRow, window);
return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
} else {
- Date date = opValueEvaluated(0, inputRow);
+ Date date = opValueEvaluated(0, inputRow, window);
return BeamSqlPrimitive.of(outputType, date.getTime());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index da706f3..f1bcb66 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -51,14 +52,14 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
if (method == null) {
reConstructMethod();
}
try {
List<Object> paras = new ArrayList<>();
for (BeamSqlExpression e : getOperands()) {
- paras.add(e.evaluate(inputRow).getValue());
+ paras.add(e.evaluate(inputRow, window).getValue());
}
return BeamSqlPrimitive.of(getOutputType(),
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
index 2f4c165..919612e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -34,9 +36,13 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
- return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- new Date(inputRow.getWindowEnd().getMillis()));
+ public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow, BoundedWindow window) {
+ if (window instanceof IntervalWindow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, ((IntervalWindow) window).end().toDate());
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot run HOP_END|TUMBLE_END|SESSION_END on GlobalWindow.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
index 2f3dd5c..0298f26 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -42,9 +43,9 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
+ public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow, BoundedWindow window) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- (Date) operands.get(0).evaluate(inputRow).getValue());
+ (Date) operands.get(0).evaluate(inputRow, window).getValue());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
index 9186ec0..4b250a5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
import java.util.Date;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -35,9 +37,13 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
- return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
- new Date(inputRow.getWindowStart().getMillis()));
+ public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow, BoundedWindow window) {
+ if (window instanceof IntervalWindow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, ((IntervalWindow) window).start().toDate());
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot run HOP_START|TUMBLE_START|SESSION_START on GlobalWindow.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index fd36457..cc15ff5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -50,11 +51,12 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
super(operands, outputType);
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow,
+ BoundedWindow window) {
BigDecimal left = BigDecimal.valueOf(
- Double.valueOf(opValueEvaluated(0, inputRow).toString()));
+ Double.valueOf(opValueEvaluated(0, inputRow, window).toString()));
BigDecimal right = BigDecimal.valueOf(
- Double.valueOf(opValueEvaluated(1, inputRow).toString()));
+ Double.valueOf(opValueEvaluated(1, inputRow, window).toString()));
BigDecimal result = calc(left, right);
return getCorrectlyTypedResult(result);
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
index 93032ae..df8bd61 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -51,9 +52,9 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
- Object leftValue = operands.get(0).evaluate(inputRow).getValue();
- Object rightValue = operands.get(1).evaluate(inputRow).getValue();
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
+ Object leftValue = operands.get(0).evaluate(inputRow, window).getValue();
+ Object rightValue = operands.get(1).evaluate(inputRow, window).getValue();
switch (operands.get(0).getOutputType()) {
case BIGINT:
case DECIMAL:
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
index 7177d96..9a9739e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -46,8 +47,8 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
- Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
+ Object leftValue = operands.get(0).evaluate(inputRow, window).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
index c74fcd9..6034344 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -46,8 +47,8 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
- Object leftValue = operands.get(0).evaluate(inputRow).getValue();
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
+ Object leftValue = operands.get(0).evaluate(inputRow, window).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
index 86abe43..336772d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Date;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -38,7 +39,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
return getOperands().size() == 0;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
return BeamSqlPrimitive.of(outputType, new Date());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
index d8de464..fe3feb8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.TimeZone;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -44,7 +45,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
return opCount <= 1;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
ret.setTime(new Date());
return BeamSqlPrimitive.of(outputType, ret);
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
index 4736571..ca4b3ce 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java
@@ -22,6 +22,7 @@ import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -42,7 +43,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression {
return opCount <= 1;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
return BeamSqlPrimitive.of(outputType, new Date());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
index 55767fa..0e1d3db 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -22,6 +22,7 @@ import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnitRange;
@@ -41,8 +42,8 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.SYMBOL;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- Date date = opValueEvaluated(0, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ Date date = opValueEvaluated(0, inputRow, window);
long time = date.getTime();
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
index 3310da5..2593629 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -22,6 +22,7 @@ import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnitRange;
@@ -41,8 +42,8 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.SYMBOL;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- Date date = opValueEvaluated(0, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ Date date = opValueEvaluated(0, inputRow, window);
long time = date.getTime();
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
index 47cd879..38afd0a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnitRange;
@@ -61,8 +62,8 @@ public class BeamSqlExtractExpression extends BeamSqlExpression {
&& opType(1) == SqlTypeName.BIGINT;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- Long time = opValueEvaluated(1, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ Long time = opValueEvaluated(1, inputRow, window);
TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
index b8964d5..2cae22b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,10 +33,10 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
boolean result = true;
for (BeamSqlExpression exp : operands) {
- BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow, window);
result = result && expOut.getValue();
if (!result) {
break;
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
index f9578b9..72a6982 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -42,10 +43,10 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
return super.accept();
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- Boolean value = opValueEvaluated(0, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ Boolean value = opValueEvaluated(0, inputRow, window);
if (value == null) {
- return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, window);
} else {
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, !value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
index 88a3916..74dde7a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,10 +33,10 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamRecord inputRow, BoundedWindow window) {
boolean result = false;
for (BeamSqlExpression exp : operands) {
- BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow);
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow, window);
result = result || expOut.getValue();
if (result) {
break;
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index 8f6c00c..ed0aac0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -38,10 +39,11 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1));
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow,
+ BoundedWindow window) {
BeamSqlExpression leftOp = op(0);
BeamSqlExpression rightOp = op(1);
- return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow));
+ return calculate(leftOp.evaluate(inputRow, window), rightOp.evaluate(inputRow, window));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index b225b8e..b1a210e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -45,9 +46,10 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
return acceptance;
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamRecord inputRow,
+ BoundedWindow window) {
BeamSqlExpression operand = op(0);
- return calculate(operand.evaluate(inputRow));
+ return calculate(operand.evaluate(inputRow, window));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
index 676f859..3072ea0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -36,7 +37,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
index 0575978..00f2693 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Random;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -42,9 +43,9 @@ public class BeamSqlRandExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRecord, BoundedWindow window) {
if (operands.size() == 1) {
- int rowSeed = opValueEvaluated(0, inputRecord);
+ int rowSeed = opValueEvaluated(0, inputRecord, window);
if (seed == null || seed != rowSeed) {
rand.setSeed(rowSeed);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
index 52f0cc1..d055de6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Random;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -43,16 +44,16 @@ public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamRecord inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamRecord inputRecord, BoundedWindow window) {
int numericIdx = 0;
if (operands.size() == 2) {
- int rowSeed = opValueEvaluated(0, inputRecord);
+ int rowSeed = opValueEvaluated(0, inputRecord, window);
if (seed == null || seed != rowSeed) {
rand.setSeed(rowSeed);
}
numericIdx = 1;
}
return BeamSqlPrimitive.of(SqlTypeName.INTEGER,
- rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord)));
+ rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord, window)));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 974e2bc..5146b14 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,8 +33,8 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.INTEGER);
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String str = opValueEvaluated(0, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String str = opValueEvaluated(0, inputRow, window);
return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
index 14ef55d..c2f317f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -51,9 +52,9 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String left = opValueEvaluated(0, inputRow);
- String right = opValueEvaluated(1, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String left = opValueEvaluated(0, inputRow, window);
+ String right = opValueEvaluated(1, inputRow, window);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
new StringBuilder(left.length() + right.length())
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
index e50872b..bf0b8f5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,8 +33,8 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String str = opValueEvaluated(0, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String str = opValueEvaluated(0, inputRow, window);
StringBuilder ret = new StringBuilder(str);
boolean isInit = true;
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
index 0f9a501..55f8d6d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,8 +33,8 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String str = opValueEvaluated(0, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String str = opValueEvaluated(0, inputRow, window);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
index 2336876..62d5a64 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -54,15 +55,15 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String str = opValueEvaluated(0, inputRow);
- String replaceStr = opValueEvaluated(1, inputRow);
- int idx = opValueEvaluated(2, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String str = opValueEvaluated(0, inputRow, window);
+ String replaceStr = opValueEvaluated(1, inputRow, window);
+ int idx = opValueEvaluated(2, inputRow, window);
// the index is 1 based.
idx -= 1;
int length = replaceStr.length();
if (operands.size() == 4) {
- length = opValueEvaluated(3, inputRow);
+ length = opValueEvaluated(3, inputRow, window);
}
StringBuilder result = new StringBuilder(
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
index 06dce91..f97547e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -56,12 +57,12 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String targetStr = opValueEvaluated(0, inputRow);
- String containingStr = opValueEvaluated(1, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String targetStr = opValueEvaluated(0, inputRow, window);
+ String containingStr = opValueEvaluated(1, inputRow, window);
int from = -1;
if (operands.size() == 3) {
- Number tmp = opValueEvaluated(2, inputRow);
+ Number tmp = opValueEvaluated(2, inputRow, window);
from = tmp.intValue();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
index f8582aa..a521ef0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -54,9 +55,9 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String str = opValueEvaluated(0, inputRow);
- int idx = opValueEvaluated(1, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String str = opValueEvaluated(0, inputRow, window);
+ int idx = opValueEvaluated(1, inputRow, window);
int startIdx = idx;
if (startIdx > 0) {
// NOTE: SQL substring is 1 based(rather than 0 based)
@@ -69,7 +70,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
}
if (operands.size() == 3) {
- int length = opValueEvaluated(2, inputRow);
+ int length = opValueEvaluated(2, inputRow, window);
if (length < 0) {
length = 0;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
index 9c2a7ae..3c3083c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.fun.SqlTrimFunction;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -58,14 +59,14 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
if (operands.size() == 1) {
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
- opValueEvaluated(0, inputRow).toString().trim());
+ opValueEvaluated(0, inputRow, window).toString().trim());
} else {
- SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
- String targetStr = opValueEvaluated(1, inputRow);
- String containingStr = opValueEvaluated(2, inputRow);
+ SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow, window);
+ String targetStr = opValueEvaluated(1, inputRow, window);
+ String containingStr = opValueEvaluated(2, inputRow, window);
switch (type) {
case LEADING:
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
index 94ac2e2..bc29ec8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,8 +33,8 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
- String str = opValueEvaluated(0, inputRow);
+ @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
+ String str = opValueEvaluated(0, inputRow, window);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index dab79a2..ce5444f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.calcite.rel.core.AggregateCall;
@@ -75,7 +76,6 @@ public class BeamAggregationTransforms implements Serializable{
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
BeamRecord outRecord = new BeamRecord(outRowType);
- outRecord.updateWindowRange(c.element().getKey(), window);
KV<BeamRecord, BeamRecord> kvRecord = c.element();
for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
@@ -85,7 +85,7 @@ public class BeamAggregationTransforms implements Serializable{
outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
}
if (windowStartFieldIdx != -1) {
- outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate());
+ outRecord.addField(windowStartFieldIdx, ((IntervalWindow) window).start().toDate());
}
c.output(outRecord);
@@ -112,7 +112,6 @@ public class BeamAggregationTransforms implements Serializable{
public BeamRecord apply(BeamRecord input) {
BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input));
BeamRecord keyOfRecord = new BeamRecord(typeOfKey);
- keyOfRecord.updateWindowRange(input, null);
for (int idx = 0; idx < groupByKeys.size(); ++idx) {
keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
@@ -223,7 +222,7 @@ public class BeamAggregationTransforms implements Serializable{
for (int idx = 0; idx < aggregators.size(); ++idx) {
deltaAcc.accumulatorElements.add(
aggregators.get(idx).add(accumulator.accumulatorElements.get(idx),
- sourceFieldExps.get(idx).evaluate(input).getValue()));
+ sourceFieldExps.get(idx).evaluate(input, null).getValue()));
}
return deltaAcc;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
index 31efeb7..d3a3f7b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
/**
@@ -44,10 +45,10 @@ public class BeamSqlFilterFn extends DoFn<BeamRecord, BeamRecord> {
}
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(ProcessContext c, BoundedWindow window) {
BeamRecord in = c.element();
- List<Object> result = executor.execute(in);
+ List<Object> result = executor.execute(in, window);
if ((Boolean) result.get(0)) {
c.output(in);