You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:21 UTC
[58/59] beam git commit: move all implementation classes/packages
into impl package
move all implementation classes/packages into impl package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/febd044a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/febd044a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/febd044a
Branch: refs/heads/DSL_SQL
Commit: febd044ae306a28fa3797a1663e54c1d7fbe43ce
Parents: c1b5482
Author: James Xu <xu...@gmail.com>
Authored: Mon Jul 31 17:11:53 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Mon Jul 31 17:11:53 2017 +0800
----------------------------------------------------------------------
.../apache/beam/sdk/extensions/sql/BeamSql.java | 2 +-
.../beam/sdk/extensions/sql/BeamSqlCli.java | 2 +-
.../beam/sdk/extensions/sql/BeamSqlEnv.java | 4 +-
.../interpreter/BeamSqlExpressionExecutor.java | 43 ++
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 442 +++++++++++++++++++
.../operator/BeamSqlCaseExpression.java | 63 +++
.../operator/BeamSqlCastExpression.java | 131 ++++++
.../interpreter/operator/BeamSqlExpression.java | 78 ++++
.../operator/BeamSqlInputRefExpression.java | 43 ++
.../interpreter/operator/BeamSqlPrimitive.java | 152 +++++++
.../operator/BeamSqlReinterpretExpression.java | 54 +++
.../operator/BeamSqlUdfExpression.java | 86 ++++
.../operator/BeamSqlWindowEndExpression.java | 42 ++
.../operator/BeamSqlWindowExpression.java | 50 +++
.../operator/BeamSqlWindowStartExpression.java | 43 ++
.../arithmetic/BeamSqlArithmeticExpression.java | 122 +++++
.../arithmetic/BeamSqlDivideExpression.java | 37 ++
.../arithmetic/BeamSqlMinusExpression.java | 36 ++
.../arithmetic/BeamSqlModExpression.java | 36 ++
.../arithmetic/BeamSqlMultiplyExpression.java | 36 ++
.../arithmetic/BeamSqlPlusExpression.java | 36 ++
.../operator/arithmetic/package-info.java | 22 +
.../comparison/BeamSqlCompareExpression.java | 96 ++++
.../comparison/BeamSqlEqualsExpression.java | 49 ++
.../BeamSqlGreaterThanExpression.java | 49 ++
.../BeamSqlGreaterThanOrEqualsExpression.java | 49 ++
.../comparison/BeamSqlIsNotNullExpression.java | 53 +++
.../comparison/BeamSqlIsNullExpression.java | 53 +++
.../comparison/BeamSqlLessThanExpression.java | 49 ++
.../BeamSqlLessThanOrEqualsExpression.java | 49 ++
.../comparison/BeamSqlNotEqualsExpression.java | 49 ++
.../operator/comparison/package-info.java | 22 +
.../date/BeamSqlCurrentDateExpression.java | 44 ++
.../date/BeamSqlCurrentTimeExpression.java | 52 +++
.../date/BeamSqlCurrentTimestampExpression.java | 48 ++
.../date/BeamSqlDateCeilExpression.java | 54 +++
.../date/BeamSqlDateFloorExpression.java | 54 +++
.../operator/date/BeamSqlExtractExpression.java | 101 +++++
.../interpreter/operator/date/package-info.java | 22 +
.../operator/logical/BeamSqlAndExpression.java | 47 ++
.../logical/BeamSqlLogicalExpression.java | 46 ++
.../operator/logical/BeamSqlNotExpression.java | 53 +++
.../operator/logical/BeamSqlOrExpression.java | 47 ++
.../operator/logical/package-info.java | 22 +
.../operator/math/BeamSqlAbsExpression.java | 74 ++++
.../operator/math/BeamSqlAcosExpression.java | 40 ++
.../operator/math/BeamSqlAsinExpression.java | 40 ++
.../operator/math/BeamSqlAtan2Expression.java | 42 ++
.../operator/math/BeamSqlAtanExpression.java | 40 ++
.../operator/math/BeamSqlCeilExpression.java | 45 ++
.../operator/math/BeamSqlCosExpression.java | 40 ++
.../operator/math/BeamSqlCotExpression.java | 40 ++
.../operator/math/BeamSqlDegreesExpression.java | 40 ++
.../operator/math/BeamSqlExpExpression.java | 40 ++
.../operator/math/BeamSqlFloorExpression.java | 45 ++
.../operator/math/BeamSqlLnExpression.java | 40 ++
.../operator/math/BeamSqlLogExpression.java | 40 ++
.../math/BeamSqlMathBinaryExpression.java | 63 +++
.../math/BeamSqlMathUnaryExpression.java | 58 +++
.../operator/math/BeamSqlPiExpression.java | 42 ++
.../operator/math/BeamSqlPowerExpression.java | 44 ++
.../operator/math/BeamSqlRadiansExpression.java | 40 ++
.../operator/math/BeamSqlRandExpression.java | 54 +++
.../math/BeamSqlRandIntegerExpression.java | 58 +++
.../operator/math/BeamSqlRoundExpression.java | 107 +++++
.../operator/math/BeamSqlSignExpression.java | 72 +++
.../operator/math/BeamSqlSinExpression.java | 40 ++
.../operator/math/BeamSqlTanExpression.java | 40 ++
.../math/BeamSqlTruncateExpression.java | 75 ++++
.../interpreter/operator/math/package-info.java | 22 +
.../impl/interpreter/operator/package-info.java | 22 +
.../string/BeamSqlCharLengthExpression.java | 39 ++
.../string/BeamSqlConcatExpression.java | 62 +++
.../string/BeamSqlInitCapExpression.java | 55 +++
.../operator/string/BeamSqlLowerExpression.java | 39 ++
.../string/BeamSqlOverlayExpression.java | 76 ++++
.../string/BeamSqlPositionExpression.java | 72 +++
.../string/BeamSqlStringUnaryExpression.java | 44 ++
.../string/BeamSqlSubstringExpression.java | 82 ++++
.../operator/string/BeamSqlTrimExpression.java | 101 +++++
.../operator/string/BeamSqlUpperExpression.java | 39 ++
.../operator/string/package-info.java | 22 +
.../sql/impl/interpreter/package-info.java | 22 +
.../sql/impl/planner/BeamQueryPlanner.java | 167 +++++++
.../sql/impl/planner/BeamRelDataTypeSystem.java | 40 ++
.../sql/impl/planner/BeamRuleSets.java | 75 ++++
.../sql/impl/planner/package-info.java | 24 +
.../sql/impl/rel/BeamAggregationRel.java | 182 ++++++++
.../extensions/sql/impl/rel/BeamFilterRel.java | 70 +++
.../extensions/sql/impl/rel/BeamIOSinkRel.java | 75 ++++
.../sql/impl/rel/BeamIOSourceRel.java | 63 +++
.../sql/impl/rel/BeamIntersectRel.java | 58 +++
.../extensions/sql/impl/rel/BeamJoinRel.java | 302 +++++++++++++
.../sql/impl/rel/BeamLogicalConvention.java | 72 +++
.../extensions/sql/impl/rel/BeamMinusRel.java | 56 +++
.../extensions/sql/impl/rel/BeamProjectRel.java | 81 ++++
.../extensions/sql/impl/rel/BeamRelNode.java | 38 ++
.../sql/impl/rel/BeamSetOperatorRelBase.java | 98 ++++
.../extensions/sql/impl/rel/BeamSortRel.java | 247 +++++++++++
.../sql/impl/rel/BeamSqlRelUtils.java | 72 +++
.../extensions/sql/impl/rel/BeamUnionRel.java | 88 ++++
.../extensions/sql/impl/rel/BeamValuesRel.java | 79 ++++
.../extensions/sql/impl/rel/package-info.java | 23 +
.../sql/impl/rule/BeamAggregationRule.java | 162 +++++++
.../sql/impl/rule/BeamFilterRule.java | 49 ++
.../sql/impl/rule/BeamIOSinkRule.java | 81 ++++
.../sql/impl/rule/BeamIOSourceRule.java | 49 ++
.../sql/impl/rule/BeamIntersectRule.java | 50 +++
.../extensions/sql/impl/rule/BeamJoinRule.java | 53 +++
.../extensions/sql/impl/rule/BeamMinusRule.java | 50 +++
.../sql/impl/rule/BeamProjectRule.java | 50 +++
.../extensions/sql/impl/rule/BeamSortRule.java | 51 +++
.../extensions/sql/impl/rule/BeamUnionRule.java | 50 +++
.../sql/impl/rule/BeamValuesRule.java | 48 ++
.../extensions/sql/impl/rule/package-info.java | 23 +
.../transform/BeamAggregationTransforms.java | 300 +++++++++++++
.../impl/transform/BeamBuiltinAggregations.java | 412 +++++++++++++++++
.../sql/impl/transform/BeamJoinTransforms.java | 166 +++++++
.../transform/BeamSetOperatorsTransforms.java | 111 +++++
.../sql/impl/transform/BeamSqlFilterFn.java | 62 +++
.../transform/BeamSqlOutputToConsoleFn.java | 41 ++
.../sql/impl/transform/BeamSqlProjectFn.java | 72 +++
.../sql/impl/transform/package-info.java | 22 +
.../extensions/sql/impl/utils/CalciteUtils.java | 113 +++++
.../extensions/sql/impl/utils/package-info.java | 22 +
.../interpreter/BeamSqlExpressionExecutor.java | 43 --
.../sql/interpreter/BeamSqlFnExecutor.java | 442 -------------------
.../operator/BeamSqlCaseExpression.java | 63 ---
.../operator/BeamSqlCastExpression.java | 131 ------
.../interpreter/operator/BeamSqlExpression.java | 78 ----
.../operator/BeamSqlInputRefExpression.java | 43 --
.../interpreter/operator/BeamSqlPrimitive.java | 152 -------
.../operator/BeamSqlReinterpretExpression.java | 54 ---
.../operator/BeamSqlUdfExpression.java | 86 ----
.../operator/BeamSqlWindowEndExpression.java | 42 --
.../operator/BeamSqlWindowExpression.java | 50 ---
.../operator/BeamSqlWindowStartExpression.java | 43 --
.../arithmetic/BeamSqlArithmeticExpression.java | 122 -----
.../arithmetic/BeamSqlDivideExpression.java | 37 --
.../arithmetic/BeamSqlMinusExpression.java | 36 --
.../arithmetic/BeamSqlModExpression.java | 36 --
.../arithmetic/BeamSqlMultiplyExpression.java | 36 --
.../arithmetic/BeamSqlPlusExpression.java | 36 --
.../operator/arithmetic/package-info.java | 22 -
.../comparison/BeamSqlCompareExpression.java | 96 ----
.../comparison/BeamSqlEqualsExpression.java | 49 --
.../BeamSqlGreaterThanExpression.java | 49 --
.../BeamSqlGreaterThanOrEqualsExpression.java | 49 --
.../comparison/BeamSqlIsNotNullExpression.java | 53 ---
.../comparison/BeamSqlIsNullExpression.java | 53 ---
.../comparison/BeamSqlLessThanExpression.java | 49 --
.../BeamSqlLessThanOrEqualsExpression.java | 49 --
.../comparison/BeamSqlNotEqualsExpression.java | 49 --
.../operator/comparison/package-info.java | 22 -
.../date/BeamSqlCurrentDateExpression.java | 44 --
.../date/BeamSqlCurrentTimeExpression.java | 52 ---
.../date/BeamSqlCurrentTimestampExpression.java | 48 --
.../date/BeamSqlDateCeilExpression.java | 54 ---
.../date/BeamSqlDateFloorExpression.java | 54 ---
.../operator/date/BeamSqlExtractExpression.java | 101 -----
.../interpreter/operator/date/package-info.java | 22 -
.../operator/logical/BeamSqlAndExpression.java | 47 --
.../logical/BeamSqlLogicalExpression.java | 46 --
.../operator/logical/BeamSqlNotExpression.java | 53 ---
.../operator/logical/BeamSqlOrExpression.java | 47 --
.../operator/logical/package-info.java | 22 -
.../operator/math/BeamSqlAbsExpression.java | 74 ----
.../operator/math/BeamSqlAcosExpression.java | 40 --
.../operator/math/BeamSqlAsinExpression.java | 40 --
.../operator/math/BeamSqlAtan2Expression.java | 42 --
.../operator/math/BeamSqlAtanExpression.java | 40 --
.../operator/math/BeamSqlCeilExpression.java | 45 --
.../operator/math/BeamSqlCosExpression.java | 40 --
.../operator/math/BeamSqlCotExpression.java | 40 --
.../operator/math/BeamSqlDegreesExpression.java | 40 --
.../operator/math/BeamSqlExpExpression.java | 40 --
.../operator/math/BeamSqlFloorExpression.java | 45 --
.../operator/math/BeamSqlLnExpression.java | 40 --
.../operator/math/BeamSqlLogExpression.java | 40 --
.../math/BeamSqlMathBinaryExpression.java | 63 ---
.../math/BeamSqlMathUnaryExpression.java | 58 ---
.../operator/math/BeamSqlPiExpression.java | 42 --
.../operator/math/BeamSqlPowerExpression.java | 44 --
.../operator/math/BeamSqlRadiansExpression.java | 40 --
.../operator/math/BeamSqlRandExpression.java | 54 ---
.../math/BeamSqlRandIntegerExpression.java | 58 ---
.../operator/math/BeamSqlRoundExpression.java | 107 -----
.../operator/math/BeamSqlSignExpression.java | 72 ---
.../operator/math/BeamSqlSinExpression.java | 40 --
.../operator/math/BeamSqlTanExpression.java | 40 --
.../math/BeamSqlTruncateExpression.java | 75 ----
.../interpreter/operator/math/package-info.java | 22 -
.../sql/interpreter/operator/package-info.java | 22 -
.../string/BeamSqlCharLengthExpression.java | 39 --
.../string/BeamSqlConcatExpression.java | 62 ---
.../string/BeamSqlInitCapExpression.java | 55 ---
.../operator/string/BeamSqlLowerExpression.java | 39 --
.../string/BeamSqlOverlayExpression.java | 76 ----
.../string/BeamSqlPositionExpression.java | 72 ---
.../string/BeamSqlStringUnaryExpression.java | 44 --
.../string/BeamSqlSubstringExpression.java | 82 ----
.../operator/string/BeamSqlTrimExpression.java | 101 -----
.../operator/string/BeamSqlUpperExpression.java | 39 --
.../operator/string/package-info.java | 22 -
.../sql/interpreter/package-info.java | 22 -
.../sql/planner/BeamQueryPlanner.java | 167 -------
.../sql/planner/BeamRelDataTypeSystem.java | 40 --
.../extensions/sql/planner/BeamRuleSets.java | 75 ----
.../extensions/sql/planner/package-info.java | 24 -
.../extensions/sql/rel/BeamAggregationRel.java | 182 --------
.../sdk/extensions/sql/rel/BeamFilterRel.java | 70 ---
.../sdk/extensions/sql/rel/BeamIOSinkRel.java | 75 ----
.../sdk/extensions/sql/rel/BeamIOSourceRel.java | 63 ---
.../extensions/sql/rel/BeamIntersectRel.java | 58 ---
.../sdk/extensions/sql/rel/BeamJoinRel.java | 302 -------------
.../sql/rel/BeamLogicalConvention.java | 72 ---
.../sdk/extensions/sql/rel/BeamMinusRel.java | 56 ---
.../sdk/extensions/sql/rel/BeamProjectRel.java | 81 ----
.../sdk/extensions/sql/rel/BeamRelNode.java | 38 --
.../sql/rel/BeamSetOperatorRelBase.java | 98 ----
.../sdk/extensions/sql/rel/BeamSortRel.java | 247 -----------
.../sdk/extensions/sql/rel/BeamSqlRelUtils.java | 72 ---
.../sdk/extensions/sql/rel/BeamUnionRel.java | 88 ----
.../sdk/extensions/sql/rel/BeamValuesRel.java | 79 ----
.../sdk/extensions/sql/rel/package-info.java | 23 -
.../sql/rule/BeamAggregationRule.java | 162 -------
.../sdk/extensions/sql/rule/BeamFilterRule.java | 49 --
.../sdk/extensions/sql/rule/BeamIOSinkRule.java | 81 ----
.../extensions/sql/rule/BeamIOSourceRule.java | 49 --
.../extensions/sql/rule/BeamIntersectRule.java | 50 ---
.../sdk/extensions/sql/rule/BeamJoinRule.java | 53 ---
.../sdk/extensions/sql/rule/BeamMinusRule.java | 50 ---
.../extensions/sql/rule/BeamProjectRule.java | 50 ---
.../sdk/extensions/sql/rule/BeamSortRule.java | 51 ---
.../sdk/extensions/sql/rule/BeamUnionRule.java | 50 ---
.../sdk/extensions/sql/rule/BeamValuesRule.java | 48 --
.../sdk/extensions/sql/rule/package-info.java | 23 -
.../sdk/extensions/sql/schema/BeamSqlRow.java | 2 +-
.../extensions/sql/schema/BeamSqlRowCoder.java | 2 +-
.../extensions/sql/schema/BeamTableUtils.java | 2 +-
.../transform/BeamAggregationTransforms.java | 300 -------------
.../sql/transform/BeamBuiltinAggregations.java | 412 -----------------
.../sql/transform/BeamJoinTransforms.java | 166 -------
.../transform/BeamSetOperatorsTransforms.java | 111 -----
.../sql/transform/BeamSqlFilterFn.java | 62 ---
.../sql/transform/BeamSqlOutputToConsoleFn.java | 41 --
.../sql/transform/BeamSqlProjectFn.java | 72 ---
.../extensions/sql/transform/package-info.java | 22 -
.../sdk/extensions/sql/utils/CalciteUtils.java | 113 -----
.../sdk/extensions/sql/utils/package-info.java | 22 -
.../sdk/extensions/sql/BeamSqlDslJoinTest.java | 4 +-
.../impl/interpreter/BeamSqlFnExecutorTest.java | 416 +++++++++++++++++
.../interpreter/BeamSqlFnExecutorTestBase.java | 92 ++++
.../operator/BeamNullExperssionTest.java | 55 +++
.../operator/BeamSqlAndOrExpressionTest.java | 61 +++
.../operator/BeamSqlCaseExpressionTest.java | 93 ++++
.../operator/BeamSqlCastExpressionTest.java | 125 ++++++
.../operator/BeamSqlCompareExpressionTest.java | 115 +++++
.../operator/BeamSqlInputRefExpressionTest.java | 57 +++
.../operator/BeamSqlPrimitiveTest.java | 59 +++
.../BeamSqlReinterpretExpressionTest.java | 75 ++++
.../operator/BeamSqlUdfExpressionTest.java | 51 +++
.../BeamSqlArithmeticExpressionTest.java | 237 ++++++++++
.../date/BeamSqlCurrentDateExpressionTest.java | 38 ++
.../date/BeamSqlCurrentTimeExpressionTest.java | 39 ++
.../BeamSqlCurrentTimestampExpressionTest.java | 39 ++
.../date/BeamSqlDateCeilExpressionTest.java | 50 +++
.../date/BeamSqlDateExpressionTestBase.java | 51 +++
.../date/BeamSqlDateFloorExpressionTest.java | 49 ++
.../date/BeamSqlExtractExpressionTest.java | 103 +++++
.../logical/BeamSqlNotExpressionTest.java | 47 ++
.../math/BeamSqlMathBinaryExpressionTest.java | 201 +++++++++
.../math/BeamSqlMathUnaryExpressionTest.java | 309 +++++++++++++
.../string/BeamSqlCharLengthExpressionTest.java | 44 ++
.../string/BeamSqlConcatExpressionTest.java | 66 +++
.../string/BeamSqlInitCapExpressionTest.java | 54 +++
.../string/BeamSqlLowerExpressionTest.java | 44 ++
.../string/BeamSqlOverlayExpressionTest.java | 87 ++++
.../string/BeamSqlPositionExpressionTest.java | 84 ++++
.../BeamSqlStringUnaryExpressionTest.java | 52 +++
.../string/BeamSqlSubstringExpressionTest.java | 101 +++++
.../string/BeamSqlTrimExpressionTest.java | 103 +++++
.../string/BeamSqlUpperExpressionTest.java | 44 ++
.../sql/impl/rel/BeamIntersectRelTest.java | 119 +++++
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 204 +++++++++
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 241 ++++++++++
.../BeamJoinRelUnboundedVsUnboundedTest.java | 219 +++++++++
.../sql/impl/rel/BeamMinusRelTest.java | 118 +++++
.../impl/rel/BeamSetOperatorRelBaseTest.java | 106 +++++
.../sql/impl/rel/BeamSortRelTest.java | 237 ++++++++++
.../sql/impl/rel/BeamUnionRelTest.java | 104 +++++
.../sql/impl/rel/BeamValuesRelTest.java | 105 +++++
.../sdk/extensions/sql/impl/rel/CheckSize.java | 41 ++
.../sql/interpreter/BeamSqlFnExecutorTest.java | 416 -----------------
.../interpreter/BeamSqlFnExecutorTestBase.java | 92 ----
.../operator/BeamNullExperssionTest.java | 55 ---
.../operator/BeamSqlAndOrExpressionTest.java | 61 ---
.../operator/BeamSqlCaseExpressionTest.java | 93 ----
.../operator/BeamSqlCastExpressionTest.java | 125 ------
.../operator/BeamSqlCompareExpressionTest.java | 115 -----
.../operator/BeamSqlInputRefExpressionTest.java | 57 ---
.../operator/BeamSqlPrimitiveTest.java | 59 ---
.../BeamSqlReinterpretExpressionTest.java | 75 ----
.../operator/BeamSqlUdfExpressionTest.java | 51 ---
.../BeamSqlArithmeticExpressionTest.java | 237 ----------
.../date/BeamSqlCurrentDateExpressionTest.java | 38 --
.../date/BeamSqlCurrentTimeExpressionTest.java | 39 --
.../BeamSqlCurrentTimestampExpressionTest.java | 39 --
.../date/BeamSqlDateCeilExpressionTest.java | 50 ---
.../date/BeamSqlDateExpressionTestBase.java | 51 ---
.../date/BeamSqlDateFloorExpressionTest.java | 49 --
.../date/BeamSqlExtractExpressionTest.java | 103 -----
.../logical/BeamSqlNotExpressionTest.java | 47 --
.../math/BeamSqlMathBinaryExpressionTest.java | 201 ---------
.../math/BeamSqlMathUnaryExpressionTest.java | 309 -------------
.../string/BeamSqlCharLengthExpressionTest.java | 44 --
.../string/BeamSqlConcatExpressionTest.java | 66 ---
.../string/BeamSqlInitCapExpressionTest.java | 54 ---
.../string/BeamSqlLowerExpressionTest.java | 44 --
.../string/BeamSqlOverlayExpressionTest.java | 87 ----
.../string/BeamSqlPositionExpressionTest.java | 84 ----
.../BeamSqlStringUnaryExpressionTest.java | 52 ---
.../string/BeamSqlSubstringExpressionTest.java | 101 -----
.../string/BeamSqlTrimExpressionTest.java | 103 -----
.../string/BeamSqlUpperExpressionTest.java | 44 --
.../sql/rel/BeamIntersectRelTest.java | 119 -----
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 204 ---------
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 241 ----------
.../BeamJoinRelUnboundedVsUnboundedTest.java | 219 ---------
.../extensions/sql/rel/BeamMinusRelTest.java | 118 -----
.../sql/rel/BeamSetOperatorRelBaseTest.java | 106 -----
.../sdk/extensions/sql/rel/BeamSortRelTest.java | 237 ----------
.../extensions/sql/rel/BeamUnionRelTest.java | 104 -----
.../extensions/sql/rel/BeamValuesRelTest.java | 105 -----
.../beam/sdk/extensions/sql/rel/CheckSize.java | 41 --
.../sql/schema/BeamSqlRowCoderTest.java | 2 +-
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 4 +-
.../sql/schema/text/BeamTextCSVTableTest.java | 4 +-
.../transform/BeamAggregationTransformTest.java | 6 +-
.../schema/transform/BeamTransformBaseTest.java | 4 +-
340 files changed, 13117 insertions(+), 13117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index d64ae41..e0d7a78 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 714e102..3bea46a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index ca73b13..be0b0af 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -18,12 +18,12 @@
package org.apache.beam.sdk.extensions.sql;
import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.type.RelDataType;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..1ae6bb3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+ /**
+ * invoked before data processing.
+ */
+ void prepare();
+
+ /**
+ * apply transformation to input record {@link BeamSqlRow}.
+ *
+ */
+ List<Object> execute(BeamSqlRow inputRow);
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..1f9e0e3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlReinterpretExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAcosExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAsinExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtan2Expression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCeilExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCosExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCotExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlDegreesExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlExpExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLnExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLogExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPiExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPowerExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRadiansExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandIntegerExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRoundExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSignExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+ protected List<BeamSqlExpression> exps;
+
+ public BeamSqlFnExecutor(BeamRelNode relNode) {
+ this.exps = new ArrayList<>();
+ if (relNode instanceof BeamFilterRel) {
+ BeamFilterRel filterNode = (BeamFilterRel) relNode;
+ RexNode condition = filterNode.getCondition();
+ exps.add(buildExpression(condition));
+ } else if (relNode instanceof BeamProjectRel) {
+ BeamProjectRel projectNode = (BeamProjectRel) relNode;
+ List<RexNode> projects = projectNode.getProjects();
+ for (RexNode rexNode : projects) {
+ exps.add(buildExpression(rexNode));
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported yet!", relNode.getClass().toString()));
+ }
+ }
+
+ /**
+ * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+ * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+ */
+ static BeamSqlExpression buildExpression(RexNode rexNode) {
+ BeamSqlExpression ret = null;
+ if (rexNode instanceof RexLiteral) {
+ RexLiteral node = (RexLiteral) rexNode;
+ SqlTypeName type = node.getTypeName();
+ Object value = node.getValue();
+
+ if (SqlTypeName.CHAR_TYPES.contains(type)
+ && node.getValue() instanceof NlsString) {
+ // NlsString is not serializable, we need to convert
+ // it to string explicitly.
+ return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+ } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+ // does this actually make sense?
+ // Calcite actually treat Calendar as the java type of Date Literal
+ return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
+ } else {
+ // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
+ // e.g. sql: "select 1"
+ // here the literal 1 will be parsed as a RexLiteral where:
+ // node.getType().getSqlTypeName() = INTEGER (the display type)
+ // node.getSqlTypeName() = DECIMAL (the actual internal storage format)
+ // So we need to do a convert here.
+ // check RexBuilder#makeLiteral for more information.
+ SqlTypeName realType = node.getType().getSqlTypeName();
+ Object realValue = value;
+ if (type == SqlTypeName.DECIMAL) {
+ BigDecimal rawValue = (BigDecimal) value;
+ switch (realType) {
+ case TINYINT:
+ realValue = (byte) rawValue.intValue();
+ break;
+ case SMALLINT:
+ realValue = (short) rawValue.intValue();
+ break;
+ case INTEGER:
+ realValue = rawValue.intValue();
+ break;
+ case BIGINT:
+ realValue = rawValue.longValue();
+ break;
+ case DECIMAL:
+ realValue = rawValue;
+ break;
+ default:
+ throw new IllegalStateException("type/realType mismatch: "
+ + type + " VS " + realType);
+ }
+ } else if (type == SqlTypeName.DOUBLE) {
+ Double rawValue = (Double) value;
+ if (realType == SqlTypeName.FLOAT) {
+ realValue = rawValue.floatValue();
+ }
+ }
+ return BeamSqlPrimitive.of(realType, realValue);
+ }
+ } else if (rexNode instanceof RexInputRef) {
+ RexInputRef node = (RexInputRef) rexNode;
+ ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+ } else if (rexNode instanceof RexCall) {
+ RexCall node = (RexCall) rexNode;
+ String opName = node.op.getName();
+ List<BeamSqlExpression> subExps = new ArrayList<>();
+ for (RexNode subNode : node.getOperands()) {
+ subExps.add(buildExpression(subNode));
+ }
+ switch (opName) {
+ // logical operators
+ case "AND":
+ ret = new BeamSqlAndExpression(subExps);
+ break;
+ case "OR":
+ ret = new BeamSqlOrExpression(subExps);
+ break;
+ case "NOT":
+ ret = new BeamSqlNotExpression(subExps);
+ break;
+ case "=":
+ ret = new BeamSqlEqualsExpression(subExps);
+ break;
+ case "<>":
+ ret = new BeamSqlNotEqualsExpression(subExps);
+ break;
+ case ">":
+ ret = new BeamSqlGreaterThanExpression(subExps);
+ break;
+ case ">=":
+ ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
+ break;
+ case "<":
+ ret = new BeamSqlLessThanExpression(subExps);
+ break;
+ case "<=":
+ ret = new BeamSqlLessThanOrEqualsExpression(subExps);
+ break;
+
+ // arithmetic operators
+ case "+":
+ ret = new BeamSqlPlusExpression(subExps);
+ break;
+ case "-":
+ ret = new BeamSqlMinusExpression(subExps);
+ break;
+ case "*":
+ ret = new BeamSqlMultiplyExpression(subExps);
+ break;
+ case "/":
+ case "/INT":
+ ret = new BeamSqlDivideExpression(subExps);
+ break;
+ case "MOD":
+ ret = new BeamSqlModExpression(subExps);
+ break;
+
+ case "ABS":
+ ret = new BeamSqlAbsExpression(subExps);
+ break;
+ case "ROUND":
+ ret = new BeamSqlRoundExpression(subExps);
+ break;
+ case "LN":
+ ret = new BeamSqlLnExpression(subExps);
+ break;
+ case "LOG10":
+ ret = new BeamSqlLogExpression(subExps);
+ break;
+ case "EXP":
+ ret = new BeamSqlExpExpression(subExps);
+ break;
+ case "ACOS":
+ ret = new BeamSqlAcosExpression(subExps);
+ break;
+ case "ASIN":
+ ret = new BeamSqlAsinExpression(subExps);
+ break;
+ case "ATAN":
+ ret = new BeamSqlAtanExpression(subExps);
+ break;
+ case "COT":
+ ret = new BeamSqlCotExpression(subExps);
+ break;
+ case "DEGREES":
+ ret = new BeamSqlDegreesExpression(subExps);
+ break;
+ case "RADIANS":
+ ret = new BeamSqlRadiansExpression(subExps);
+ break;
+ case "COS":
+ ret = new BeamSqlCosExpression(subExps);
+ break;
+ case "SIN":
+ ret = new BeamSqlSinExpression(subExps);
+ break;
+ case "TAN":
+ ret = new BeamSqlTanExpression(subExps);
+ break;
+ case "SIGN":
+ ret = new BeamSqlSignExpression(subExps);
+ break;
+ case "POWER":
+ ret = new BeamSqlPowerExpression(subExps);
+ break;
+ case "PI":
+ ret = new BeamSqlPiExpression();
+ break;
+ case "ATAN2":
+ ret = new BeamSqlAtan2Expression(subExps);
+ break;
+ case "TRUNCATE":
+ ret = new BeamSqlTruncateExpression(subExps);
+ break;
+ case "RAND":
+ ret = new BeamSqlRandExpression(subExps);
+ break;
+ case "RAND_INTEGER":
+ ret = new BeamSqlRandIntegerExpression(subExps);
+ break;
+
+ // string operators
+ case "||":
+ ret = new BeamSqlConcatExpression(subExps);
+ break;
+ case "POSITION":
+ ret = new BeamSqlPositionExpression(subExps);
+ break;
+ case "CHAR_LENGTH":
+ case "CHARACTER_LENGTH":
+ ret = new BeamSqlCharLengthExpression(subExps);
+ break;
+ case "UPPER":
+ ret = new BeamSqlUpperExpression(subExps);
+ break;
+ case "LOWER":
+ ret = new BeamSqlLowerExpression(subExps);
+ break;
+ case "TRIM":
+ ret = new BeamSqlTrimExpression(subExps);
+ break;
+ case "SUBSTRING":
+ ret = new BeamSqlSubstringExpression(subExps);
+ break;
+ case "OVERLAY":
+ ret = new BeamSqlOverlayExpression(subExps);
+ break;
+ case "INITCAP":
+ ret = new BeamSqlInitCapExpression(subExps);
+ break;
+
+ // date functions
+ case "Reinterpret":
+ return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
+ case "CEIL":
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ return new BeamSqlCeilExpression(subExps);
+ } else {
+ return new BeamSqlDateCeilExpression(subExps);
+ }
+ case "FLOOR":
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ return new BeamSqlFloorExpression(subExps);
+ } else {
+ return new BeamSqlDateFloorExpression(subExps);
+ }
+ case "EXTRACT_DATE":
+ case "EXTRACT":
+ return new BeamSqlExtractExpression(subExps);
+
+ case "LOCALTIME":
+ case "CURRENT_TIME":
+ return new BeamSqlCurrentTimeExpression(subExps);
+
+ case "CURRENT_TIMESTAMP":
+ case "LOCALTIMESTAMP":
+ return new BeamSqlCurrentTimestampExpression(subExps);
+
+ case "CURRENT_DATE":
+ return new BeamSqlCurrentDateExpression();
+
+
+ case "CASE":
+ ret = new BeamSqlCaseExpression(subExps);
+ break;
+ case "CAST":
+ ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+ break;
+
+ case "IS NULL":
+ ret = new BeamSqlIsNullExpression(subExps.get(0));
+ break;
+ case "IS NOT NULL":
+ ret = new BeamSqlIsNotNullExpression(subExps.get(0));
+ break;
+
+ case "HOP":
+ case "TUMBLE":
+ case "SESSION":
+ ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+ break;
+ case "HOP_START":
+ case "TUMBLE_START":
+ case "SESSION_START":
+ ret = new BeamSqlWindowStartExpression();
+ break;
+ case "HOP_END":
+ case "TUMBLE_END":
+ case "SESSION_END":
+ ret = new BeamSqlWindowEndExpression();
+ break;
+ default:
+ //handle UDF
+ if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+ SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+ ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+ ret = new BeamSqlUdfExpression(fn.method, subExps,
+ ((RexCall) rexNode).type.getSqlTypeName());
+ } else {
+ throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not supported yet!", rexNode.getClass().toString()));
+ }
+
+ if (ret != null && !ret.accept()) {
+ throw new IllegalStateException(ret.getClass().getSimpleName()
+ + " does not accept the operands.(" + rexNode + ")");
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List<Object> execute(BeamSqlRow inputRow) {
+ List<Object> results = new ArrayList<>();
+ for (BeamSqlExpression exp : exps) {
+ results.add(exp.evaluate(inputRow).getValue());
+ }
+ return results;
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
new file mode 100644
index 0000000..61e8aae
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL.
+ */
+public class BeamSqlCaseExpression extends BeamSqlExpression {
+ public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
+ // the return type of CASE is the type of the `else` condition
+ super(operands, operands.get(operands.size() - 1).getOutputType());
+ }
+
+ @Override public boolean accept() {
+ // `when`-`then` pair + `else`
+ if (operands.size() % 2 != 1) {
+ return false;
+ }
+
+ for (int i = 0; i < operands.size() - 1; i += 2) {
+ if (opType(i) != SqlTypeName.BOOLEAN) {
+ return false;
+ } else if (opType(i + 1) != outputType) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ for (int i = 0; i < operands.size() - 1; i += 2) {
+ if (opValueEvaluated(i, inputRow)) {
+ return BeamSqlPrimitive.of(
+ outputType,
+ opValueEvaluated(i + 1, inputRow)
+ );
+ }
+ }
+ return BeamSqlPrimitive.of(outputType,
+ opValueEvaluated(operands.size() - 1, inputRow));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..c98c10d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+ private static final int index = 0;
+ private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+ private static final String outputDateFormat = "yyyy-MM-dd";
+ /**
+ * Date and Timestamp formats used to parse
+ * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+ */
+ private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+ .append(null/*printer*/, new DateTimeParser[] {
+ // date formats
+ DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+ DateTimeFormat.forPattern("yyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+ // datetime formats
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+ .withPivotYear(2020);
+
+ public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+ super(operands, castType);
+ }
+
+ @Override
+ public boolean accept() {
+ return numberOfOperands() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ SqlTypeName castOutputType = getOutputType();
+ switch (castOutputType) {
+ case INTEGER:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+ case DOUBLE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+ case SMALLINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+ case TINYINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+ case BIGINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+ case DECIMAL:
+ return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+ SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+ case FLOAT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+ case CHAR:
+ case VARCHAR:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+ case DATE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+ case TIMESTAMP:
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+ }
+ throw new UnsupportedOperationException(
+ String.format("Cast to type %s not supported", castOutputType));
+ }
+
+ private Date toDate(Object inputDate, String outputFormat) {
+ try {
+ return Date
+ .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+ }
+ }
+
+ private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+ try {
+ return Timestamp.valueOf(
+ dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+ .roundCeilingCopy().toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
new file mode 100644
index 0000000..dc5db81
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
+ *
+ * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
+ * as its operands, and return a value with type {@link SqlTypeName}.
+ *
+ */
+public abstract class BeamSqlExpression implements Serializable {
+ protected List<BeamSqlExpression> operands;
+ protected SqlTypeName outputType;
+
+ protected BeamSqlExpression(){}
+
+ public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ this.operands = operands;
+ this.outputType = outputType;
+ }
+
+ public BeamSqlExpression op(int idx) {
+ return operands.get(idx);
+ }
+
+ public SqlTypeName opType(int idx) {
+ return op(idx).getOutputType();
+ }
+
+ public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
+ return (T) op(idx).evaluate(row).getValue();
+ }
+
+ /**
+ * assertion to make sure the input and output are supported in this expression.
+ */
+ public abstract boolean accept();
+
+ /**
+ * Apply input record {@link BeamSqlRow} to this expression,
+ * the output value is wrapped with {@link BeamSqlPrimitive}.
+ */
+ public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
+
+ public List<BeamSqlExpression> getOperands() {
+ return operands;
+ }
+
+ public SqlTypeName getOutputType() {
+ return outputType;
+ }
+
+ public int numberOfOperands() {
+ return operands.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
new file mode 100644
index 0000000..7aba024
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * An primitive operation for direct field extraction.
+ */
+public class BeamSqlInputRefExpression extends BeamSqlExpression {
+ private int inputRef;
+
+ public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
+ super(null, sqlTypeName);
+ this.inputRef = inputRef;
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
new file mode 100644
index 0000000..6380af9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
+ *
+ */
+public class BeamSqlPrimitive<T> extends BeamSqlExpression {
+ private T value;
+
+ private BeamSqlPrimitive() {
+ }
+
+ private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ /**
+ * A builder function to create from Type and value directly.
+ */
+ public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
+ BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
+ exp.outputType = outputType;
+ exp.value = value;
+ if (!exp.accept()) {
+ throw new IllegalArgumentException(
+ String.format("value [%s] doesn't match type [%s].", value, outputType));
+ }
+ return exp;
+ }
+
+ public SqlTypeName getOutputType() {
+ return outputType;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public long getLong() {
+ return (Long) getValue();
+ }
+
+ public double getDouble() {
+ return (Double) getValue();
+ }
+
+ public float getFloat() {
+ return (Float) getValue();
+ }
+
+ public int getInteger() {
+ return (Integer) getValue();
+ }
+
+ public short getShort() {
+ return (Short) getValue();
+ }
+
+ public byte getByte() {
+ return (Byte) getValue();
+ }
+ public boolean getBoolean() {
+ return (Boolean) getValue();
+ }
+
+ public String getString() {
+ return (String) getValue();
+ }
+
+ public Date getDate() {
+ return (Date) getValue();
+ }
+
+ public BigDecimal getDecimal() {
+ return (BigDecimal) getValue();
+ }
+
+ @Override
+ public boolean accept() {
+ if (value == null) {
+ return true;
+ }
+
+ switch (outputType) {
+ case BIGINT:
+ return value instanceof Long;
+ case DECIMAL:
+ return value instanceof BigDecimal;
+ case DOUBLE:
+ return value instanceof Double;
+ case FLOAT:
+ return value instanceof Float;
+ case INTEGER:
+ return value instanceof Integer;
+ case SMALLINT:
+ return value instanceof Short;
+ case TINYINT:
+ return value instanceof Byte;
+ case BOOLEAN:
+ return value instanceof Boolean;
+ case CHAR:
+ case VARCHAR:
+ return value instanceof String || value instanceof NlsString;
+ case TIME:
+ return value instanceof GregorianCalendar;
+ case TIMESTAMP:
+ case DATE:
+ return value instanceof Date;
+ case INTERVAL_HOUR:
+ return value instanceof BigDecimal;
+ case INTERVAL_MINUTE:
+ return value instanceof BigDecimal;
+ case SYMBOL:
+ // for SYMBOL, it supports anything...
+ return true;
+ default:
+ throw new UnsupportedOperationException(outputType.name());
+ }
+ }
+
+ @Override
+ public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..243baaa
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for REINTERPRET.
+ *
+ * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
+ * to {@code BIGINT} is supported.
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+ public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public boolean accept() {
+ return getOperands().size() == 1
+ && outputType == SqlTypeName.BIGINT
+ && SqlTypeName.DATETIME_TYPES.contains(opType(0));
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ if (opType(0) == SqlTypeName.TIME) {
+ GregorianCalendar date = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
+
+ } else {
+ Date date = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(outputType, date.getTime());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
new file mode 100644
index 0000000..eebb97c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * invoke a UDF function.
+ */
+public class BeamSqlUdfExpression extends BeamSqlExpression {
+ //as Method is not Serializable, need to keep class/method information, and rebuild it.
+ private transient Method method;
+ private String className;
+ private String methodName;
+ private List<String> paraClassName = new ArrayList<>();
+
+ public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
+ SqlTypeName sqlTypeName) {
+ super(subExps, sqlTypeName);
+ this.method = method;
+
+ this.className = method.getDeclaringClass().getName();
+ this.methodName = method.getName();
+ for (Class<?> c : method.getParameterTypes()) {
+ paraClassName.add(c.getName());
+ }
+ }
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ if (method == null) {
+ reConstructMethod();
+ }
+ try {
+ List<Object> paras = new ArrayList<>();
+ for (BeamSqlExpression e : getOperands()) {
+ paras.add(e.evaluate(inputRow).getValue());
+ }
+
+ return BeamSqlPrimitive.of(getOutputType(),
+ method.invoke(null, paras.toArray(new Object[]{})));
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * re-construct method from class/method.
+ */
+ private void reConstructMethod() {
+ try {
+ List<Class<?>> paraClass = new ArrayList<>();
+ for (String pc : paraClassName) {
+ paraClass.add(Class.forName(pc));
+ }
+ method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
new file mode 100644
index 0000000..0bd68df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
+ *
+ * <p>These operators returns the <em>end</em> timestamp of window.
+ */
+public class BeamSqlWindowEndExpression extends BeamSqlExpression {
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ new Date(inputRow.getWindowEnd().getMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
new file mode 100644
index 0000000..b560ef8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
+ *
+ * <p>These functions don't change the timestamp field, instead it's used to indicate
+ * the event_timestamp field, and how the window is defined.
+ */
+public class BeamSqlWindowExpression extends BeamSqlExpression {
+
+ public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override
+ public boolean accept() {
+ return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
+ || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
+ || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ (Date) operands.get(0).evaluate(inputRow).getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
new file mode 100644
index 0000000..e2c1b34
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
+ * {@code SESSION_START} operation.
+ *
+ * <p>These operators returns the <em>start</em> timestamp of window.
+ */
+public class BeamSqlWindowStartExpression extends BeamSqlExpression {
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ new Date(inputRow.getWindowStart().getMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
new file mode 100644
index 0000000..b07b28f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all arithmetic operators.
+ */
+public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
+ private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
+ static {
+ ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
+ ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
+ }
+
+ protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+ super(operands, deduceOutputType(operands.get(0).getOutputType(),
+ operands.get(1).getOutputType()));
+ }
+
+ protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+ BigDecimal left = BigDecimal.valueOf(
+ Double.valueOf(opValueEvaluated(0, inputRow).toString()));
+ BigDecimal right = BigDecimal.valueOf(
+ Double.valueOf(opValueEvaluated(1, inputRow).toString()));
+
+ BigDecimal result = calc(left, right);
+ return getCorrectlyTypedResult(result);
+ }
+
+ protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
+
+ protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
+ int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
+ int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
+ if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
+ && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
+ return SqlTypeName.DOUBLE;
+ }
+
+ if (leftIndex < rightIndex) {
+ return right;
+ } else if (leftIndex > rightIndex) {
+ return left;
+ } else {
+ return left;
+ }
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() != 2) {
+ return false;
+ }
+
+ for (BeamSqlExpression operand : operands) {
+ if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
+ Number actualValue;
+ switch (outputType) {
+ case TINYINT:
+ actualValue = rawResult.byteValue();
+ break;
+ case SMALLINT:
+ actualValue = rawResult.shortValue();
+ break;
+ case INTEGER:
+ actualValue = rawResult.intValue();
+ break;
+ case BIGINT:
+ actualValue = rawResult.longValue();
+ break;
+ case FLOAT:
+ actualValue = rawResult.floatValue();
+ break;
+ case DOUBLE:
+ actualValue = rawResult.doubleValue();
+ break;
+ case DECIMAL:
+ default:
+ actualValue = rawResult;
+ }
+ return BeamSqlPrimitive.of(outputType, actualValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
new file mode 100644
index 0000000..d62a3f8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '/' operator.
+ */
+public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
+ public BeamSqlDivideExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+ return left.divide(right, 10, RoundingMode.HALF_EVEN);
+ }
+}