You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/10 17:03:26 UTC
[1/2] beam git commit: [BEAM-2006] window support Add support for
aggregation: global, HOP, TUMBLE, SESSION, only aggregation function COUNT
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 7e918a76e -> 6729a027d
[BEAM-2006] window support
Add support for aggregation: global, HOP, TUMBLE, SESSION, only aggregation function COUNT
fix typo
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c6d60fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c6d60fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c6d60fb
Branch: refs/heads/DSL_SQL
Commit: 3c6d60fb2f473b1860cfeca0f740ec817499d085
Parents: 7e918a7
Author: mingmxu <mi...@ebay.com>
Authored: Tue May 9 23:04:17 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Tue May 9 23:15:58 2017 -0700
----------------------------------------------------------------------
dsls/sql/pom.xml | 6 +-
.../dsls/sql/interpreter/BeamSQLFnExecutor.java | 20 ++-
.../interpreter/operator/BeamSqlPrimitive.java | 10 ++
.../operator/BeamSqlWindowEndExpression.java | 42 +++++
.../operator/BeamSqlWindowExpression.java | 50 ++++++
.../operator/BeamSqlWindowStartExpression.java | 43 +++++
.../dsls/sql/planner/BeamPipelineCreator.java | 17 +-
.../beam/dsls/sql/planner/BeamRuleSets.java | 4 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 154 ++++++++++++++++++
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 4 +-
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 2 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 2 +-
.../beam/dsls/sql/rel/BeamProjectRel.java | 4 +-
.../beam/dsls/sql/rule/BeamAggregationRule.java | 163 +++++++++++++++++++
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 5 +
.../apache/beam/dsls/sql/schema/BeamSQLRow.java | 64 +++++++-
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 21 ++-
.../sql/transform/BeamAggregationTransform.java | 120 ++++++++++++++
.../dsls/sql/transform/BeamSQLProjectFn.java | 8 +-
.../beam/dsls/sql/planner/BasePlanner.java | 61 ++++---
.../sql/planner/BeamGroupByExplainTest.java | 93 +++++++++++
.../sql/planner/BeamGroupByPipelineTest.java | 94 +++++++++++
.../sql/planner/BeamInvalidGroupByTest.java | 44 +++++
.../BeamPlannerAggregationSubmitTest.java | 136 ++++++++++++++++
.../sql/planner/BeamPlannerExplainTest.java | 6 +-
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 12 +-
.../dsls/sql/planner/MockedBeamSQLTable.java | 38 ++---
27 files changed, 1137 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index 15692e9..bc658e6 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -34,7 +34,7 @@
<properties>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
- <calcite-version>1.11.0</calcite-version>
+ <calcite-version>1.12.0</calcite-version>
</properties>
<build>
@@ -209,5 +209,9 @@
<artifactId>commons-csv</artifactId>
<version>1.4</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
index a14d347..4ae7b33 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -34,6 +34,9 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
@@ -159,8 +162,21 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
case "IS NULL":
return new BeamSqlIsNullExpression(subExps.get(0));
- case "IS NOT NULL":
- return new BeamSqlIsNotNullExpression(subExps.get(0));
+ case "IS NOT NULL":
+ return new BeamSqlIsNotNullExpression(subExps.get(0));
+
+ case "HOP":
+ case "TUMBLE":
+ case "SESSION":
+ return new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+ case "HOP_START":
+ case "TUMBLE_START":
+ case "SESSION_START":
+ return new BeamSqlWindowStartExpression();
+ case "HOP_END":
+ case "TUMBLE_END":
+ case "SESSION_END":
+ return new BeamSqlWindowEndExpression();
default:
throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index dbe6c3c..3309577 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -18,6 +18,8 @@
package org.apache.beam.dsls.sql.interpreter.operator;
import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
import java.util.List;
import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
@@ -89,6 +91,14 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression{
case CHAR:
case VARCHAR:
return value instanceof String || value instanceof NlsString;
+ case TIME:
+ return value instanceof GregorianCalendar;
+ case TIMESTAMP:
+ return value instanceof Date;
+ case INTERVAL_HOUR:
+ return value instanceof BigDecimal;
+ case INTERVAL_MINUTE:
+ return value instanceof BigDecimal;
default:
throw new BeamSqlUnsupportedException(outputType.name());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
new file mode 100644
index 0000000..96ad81f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
+ *
+ * <p>These operators returns the <em>end</em> timestamp of window.
+ */
+public class BeamSqlWindowEndExpression extends BeamSqlExpression {
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ new Date(inputRecord.getWindowEnd().getMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
new file mode 100644
index 0000000..2fb9a48
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
+ *
+ * <p>These functions don't change the timestamp field, instead it's used to indicate
+ * the event_timestamp field, and how the window is defined.
+ */
+public class BeamSqlWindowExpression extends BeamSqlExpression {
+
+ public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override
+ public boolean accept() {
+ return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
+ || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
+ || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ (Date) operands.get(0).evaluate(inputRecord).getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
new file mode 100644
index 0000000..d0ac260
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
+ * {@code SESSION_START} operation.
+ *
+ * <p>These operators returns the <em>start</em> timestamp of window.
+ */
+public class BeamSqlWindowStartExpression extends BeamSqlExpression {
+
+ @Override
+ public boolean accept() {
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ new Date(inputRecord.getWindowStart().getMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
index 00274a2..1d7cfd1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
@@ -19,6 +19,9 @@ package org.apache.beam.dsls.sql.planner;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
@@ -38,7 +41,7 @@ import org.apache.beam.sdk.values.PCollection;
*/
public class BeamPipelineCreator {
private Map<String, BaseBeamTable> sourceTables;
- private PCollection<BeamSQLRow> latestStream;
+ private Queue<PCollection<BeamSQLRow>> upStreamQueue;
private PipelineOptions options;
@@ -53,18 +56,20 @@ public class BeamPipelineCreator {
.as(PipelineOptions.class); // FlinkPipelineOptions.class
options.setJobName("BeamPlanCreator");
+ upStreamQueue = new ConcurrentLinkedQueue<>();
+
pipeline = Pipeline.create(options);
CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
}
- public PCollection<BeamSQLRow> getLatestStream() {
- return latestStream;
+ public PCollection<BeamSQLRow> popUpstream() {
+ return upStreamQueue.poll();
}
- public void setLatestStream(PCollection<BeamSQLRow> latestStream) {
- this.latestStream = latestStream;
+ public void pushUpstream(PCollection<BeamSQLRow> upstream) {
+ this.upStreamQueue.add(upstream);
}
public Map<String, BaseBeamTable> getSourceTables() {
@@ -75,7 +80,7 @@ public class BeamPipelineCreator {
return pipeline;
}
- public boolean isHasPersistent() {
+ public boolean hasPersistent() {
return hasPersistent;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index 2af31dc..acbd43f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
import java.util.Iterator;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
import org.apache.beam.dsls.sql.rule.BeamFilterRule;
import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
@@ -38,7 +39,8 @@ import org.apache.calcite.tools.RuleSet;
public class BeamRuleSets {
private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
.<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
- BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE)
+ BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
+ BeamAggregationRule.INSTANCE)
.build();
public static RuleSet[] getRuleSets() {
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
new file mode 100644
index 0000000..2c7626d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.transform.BeamAggregationTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.joda.time.Duration;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Aggregate} node.
+ *
+ */
+public class BeamAggregationRel extends Aggregate implements BeamRelNode {
+ private int windowFieldIdx = -1;
+ private WindowFn<BeamSQLRow, BoundedWindow> windowFn;
+ private Trigger trigger;
+ private Duration allowedLatence = Duration.ZERO;
+
+ public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
+ , RelNode child, boolean indicator,
+ ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls
+ , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ this.windowFn = windowFn;
+ this.trigger = trigger;
+ this.windowFieldIdx = windowFieldIdx;
+ this.allowedLatence = allowedLatence;
+ }
+
+ @Override
+ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+ RelNode input = getInput();
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+
+ String stageName = BeamSQLRelUtils.getStageName(this);
+
+ PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
+ if (windowFieldIdx != -1) {
+ upstream = upstream.apply("assignEventTimestamp", WithTimestamps
+ .<BeamSQLRow>of(new BeamAggregationTransform.WindowTimestampFn(windowFieldIdx)));
+ }
+
+ PCollection<BeamSQLRow> windowStream = upstream.apply("window",
+ Window.<BeamSQLRow>into(windowFn)
+ .triggering(trigger)
+ .withAllowedLateness(allowedLatence)
+ .accumulatingFiredPanes());
+
+ PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = windowStream.apply("exGroupBy",
+ WithKeys
+ .of(new BeamAggregationTransform.AggregationGroupByKeyFn(windowFieldIdx, groupSet)));
+
+ PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream
+ .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create());
+
+ if (aggCalls.size() > 1) {
+ throw new BeamSqlUnsupportedException("only single aggregation is supported now.");
+ }
+
+ AggregateCall aggCall = aggCalls.get(0);
+ switch (aggCall.getAggregation().getName()) {
+ case "COUNT":
+ PCollection<KV<BeamSQLRow, Long>> aggregatedStream = groupedStream.apply("count",
+ Combine.<BeamSQLRow, BeamSQLRow, Long>groupedValues(Count.combineFn()));
+ PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord",
+ ParDo.of(new BeamAggregationTransform.MergeAggregationRecord(
+ BeamSQLRecordType.from(getRowType()), aggCall.getName())));
+ planCreator.pushUpstream(mergedStream);
+ break;
+ default:
+ //Only support COUNT now, more are added in BEAM-2008
+ throw new BeamSqlUnsupportedException(
+ String.format("Unsupported aggregation [%s]", aggCall.getAggregation().getName()));
+ }
+
+ return planCreator.getPipeline();
+ }
+
+ @Override
+ public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
+ , ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ return new BeamAggregationRel(getCluster(), traitSet, input, indicator
+ , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence);
+ }
+
+ public void setWindowFn(WindowFn windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ public void setTrigger(Trigger trigger) {
+ this.trigger = trigger;
+ }
+
+ public RelWriter explainTerms(RelWriter pw) {
+ // We skip the "groups" element if it is a singleton of "group".
+ pw.item("group", groupSet)
+ .itemIf("window", windowFn, windowFn != null)
+ .itemIf("trigger", trigger, trigger != null)
+ .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
+ .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
+ .itemIf("indicator", indicator, indicator)
+ .itemIf("aggs", aggCalls, pw.nest());
+ if (!pw.nest()) {
+ for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
+ pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
+ }
+ }
+ return pw;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 477be5a..e1c5b3e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -56,14 +56,14 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
String stageName = BeamSQLRelUtils.getStageName(this);
- PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+ PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
- planCreator.setLatestStream(projectStream);
+ planCreator.pushUpstream(projectStream);
return planCreator.getPipeline();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index cad0b3c..f38b9e1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -60,7 +60,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
String stageName = BeamSQLRelUtils.getStageName(this);
- PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+ PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index 6b1b6cd..3538273 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -52,7 +52,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
sourceTable.buildIOReader());
- planCreator.setLatestStream(sourceStream);
+ planCreator.pushUpstream(sourceStream);
return planCreator.getPipeline();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 7e27ab3..65f5b20 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -67,14 +67,14 @@ public class BeamProjectRel extends Project implements BeamRelNode {
String stageName = BeamSQLRelUtils.getStageName(this);
- PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+ PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
.of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
- planCreator.setLatestStream(projectStream);
+ planCreator.pushUpstream(projectStream);
return planCreator.getPipeline();
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
new file mode 100644
index 0000000..249d02d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.InvalidFieldException;
+import org.apache.beam.dsls.sql.rel.BeamAggregationRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Duration;
+
+/**
+ * Rule to detect the window/trigger settings.
+ *
+ */
+public class BeamAggregationRule extends RelOptRule {
+ public static final BeamAggregationRule INSTANCE =
+ new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+ public BeamAggregationRule(
+ Class<? extends Aggregate> aggregateClass,
+ Class<? extends Project> projectClass,
+ RelBuilderFactory relBuilderFactory) {
+ super(
+ operand(aggregateClass,
+ operand(projectClass, any())),
+ relBuilderFactory, null);
+ }
+
+ public BeamAggregationRule(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate aggregate = call.rel(0);
+ final Project project = call.rel(1);
+ updateWindowTrigger(call, aggregate, project);
+ }
+
+ private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
+ Project project) {
+ ImmutableBitSet groupByFields = aggregate.getGroupSet();
+ List<RexNode> projectMapping = project.getProjects();
+
+ WindowFn windowFn = new GlobalWindows();
+ Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
+ int windowFieldIdx = -1;
+ Duration allowedLatence = Duration.ZERO;
+
+ for (int groupField : groupByFields.asList()) {
+ RexNode projNode = projectMapping.get(groupField);
+ if (projNode instanceof RexCall) {
+ SqlOperator op = ((RexCall) projNode).op;
+ ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+ String functionName = op.getName();
+ switch (functionName) {
+ case "TUMBLE":
+ windowFieldIdx = groupField;
+ windowFn = FixedWindows
+ .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "HOP":
+ windowFieldIdx = groupField;
+ windowFn = SlidingWindows
+ .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
+ .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
+ if (parameters.size() == 4) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "SESSION":
+ windowFieldIdx = groupField;
+ windowFn = Sessions
+ .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
+ aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(aggregate.getInput(),
+ aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList(),
+ windowFn,
+ triggerFn,
+ windowFieldIdx,
+ allowedLatence);
+ call.transformTo(newAggregator);
+ }
+
+ private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+ return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
+ .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
+ }
+
+ private long getWindowParameterAsMillis(RexNode parameterNode) {
+ if (parameterNode instanceof RexLiteral) {
+ return RexLiteral.intValue(parameterNode);
+ } else {
+ throw new InvalidFieldException(String.format("[%s] is not valid.", parameterNode));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
index e4013bc..94531f0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -42,6 +42,11 @@ public class BeamSQLRecordType implements Serializable {
return record;
}
+ public void addField(String fieldName, SqlTypeName fieldType) {
+ fieldsName.add(fieldName);
+ fieldsType.add(fieldType);
+ }
+
public int size() {
return fieldsName.size();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index f9dab8a..65f4a41 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -20,9 +20,14 @@ package org.apache.beam.dsls.sql.schema;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
+import java.util.GregorianCalendar;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.beam.dsls.sql.exception.InvalidFieldException;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Instant;
/**
* Repersent a generic ROW record in Beam SQL.
@@ -34,6 +39,9 @@ public class BeamSQLRow implements Serializable {
private List<Object> dataValues;
private BeamSQLRecordType dataType;
+ private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+ private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+
public BeamSQLRow(BeamSQLRecordType dataType) {
this.dataType = dataType;
this.dataValues = new ArrayList<>();
@@ -50,6 +58,17 @@ public class BeamSQLRow implements Serializable {
}
}
+ public void updateWindowRange(BeamSQLRow upstreamRecord, BoundedWindow window){
+ windowStart = upstreamRecord.windowStart;
+ windowEnd = upstreamRecord.windowEnd;
+
+ if (window instanceof IntervalWindow) {
+ IntervalWindow iWindow = (IntervalWindow) window;
+ windowStart = iWindow.start();
+ windowEnd = iWindow.end();
+ }
+ }
+
public void addField(String fieldName, Object fieldValue) {
addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
}
@@ -107,6 +126,18 @@ public class BeamSQLRow implements Serializable {
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
}
break;
+ case TIME:
+ if (!(fieldValue instanceof GregorianCalendar)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case TIMESTAMP:
+ if (!(fieldValue instanceof Date)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
default:
throw new UnsupportedDataTypeException(fieldType);
}
@@ -203,6 +234,20 @@ public class BeamSQLRow implements Serializable {
} else {
return fieldValue;
}
+ case TIME:
+ if (!(fieldValue instanceof GregorianCalendar)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case TIMESTAMP:
+ if (!(fieldValue instanceof Date)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
default:
throw new UnsupportedDataTypeException(fieldType);
}
@@ -236,9 +281,26 @@ public class BeamSQLRow implements Serializable {
return nullFields;
}
+ public Instant getWindowStart() {
+ return windowStart;
+ }
+
+ public Instant getWindowEnd() {
+ return windowEnd;
+ }
+
+ public void setWindowStart(Instant windowStart) {
+ this.windowStart = windowStart;
+ }
+
+ public void setWindowEnd(Instant windowEnd) {
+ this.windowEnd = windowEnd;
+ }
+
@Override
public String toString() {
- return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
+ return "BeamSQLRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
+ + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 9b2474a..3100ba5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -20,12 +20,14 @@ package org.apache.beam.dsls.sql.schema;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Date;
import java.util.List;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -43,6 +45,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
private static final DoubleCoder doubleCoder = DoubleCoder.of();
+ private static final InstantCoder instantCoder = InstantCoder.of();
private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
private BeamSqlRowCoder(){}
@@ -74,8 +77,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
doubleCoder.encode(value.getDouble(idx), outStream, context.nested());
break;
case FLOAT:
- doubleCoder.encode(Double.parseDouble(
- String.valueOf(value.getFloat(idx))), outStream, context.nested());
+ doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested());
break;
case BIGINT:
longCoder.encode(value.getLong(idx), outStream, context.nested());
@@ -83,13 +85,17 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
case VARCHAR:
stringCoder.encode(value.getString(idx), outStream, context.nested());
break;
+ case TIMESTAMP:
+ longCoder.encode(value.getDate(idx).getTime(), outStream, context);
+ break;
default:
throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
}
}
- //add a dummy field to indicate the end of record
- intCoder.encode(value.size(), outStream, context);
+
+ instantCoder.encode(value.getWindowStart(), outStream, context.nested());
+ instantCoder.encode(value.getWindowEnd(), outStream, context);
}
@Override
@@ -128,12 +134,17 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
case VARCHAR:
record.addField(idx, stringCoder.decode(inStream, context.nested()));
break;
+ case TIMESTAMP:
+ record.addField(idx, new Date(longCoder.decode(inStream, context)));
+ break;
default:
throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
}
}
- intCoder.decode(inStream, context);
+
+ record.setWindowStart(instantCoder.decode(inStream, context.nested()));
+ record.setWindowEnd(instantCoder.decode(inStream, context));
return record;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java
new file mode 100644
index 0000000..f478363
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Instant;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation.
+ */
+public class BeamAggregationTransform implements Serializable{
+ /**
+ * Merge KV to single record.
+ */
+ public static class MergeAggregationRecord extends DoFn<KV<BeamSQLRow, Long>, BeamSQLRow> {
+ private BeamSQLRecordType outRecordType;
+ private String aggFieldName;
+
+ public MergeAggregationRecord(BeamSQLRecordType outRecordType, String aggFieldName) {
+ this.outRecordType = outRecordType;
+ this.aggFieldName = aggFieldName;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ BeamSQLRow outRecord = new BeamSQLRow(outRecordType);
+ outRecord.updateWindowRange(c.element().getKey(), window);
+
+ KV<BeamSQLRow, Long> kvRecord = c.element();
+ for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
+ outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
+ }
+ outRecord.addField(aggFieldName, kvRecord.getValue());
+
+// if (c.pane().isLast()) {
+ c.output(outRecord);
+// }
+ }
+ }
+
+ /**
+ * extract group-by fields.
+ */
+ public static class AggregationGroupByKeyFn
+ implements SerializableFunction<BeamSQLRow, BeamSQLRow> {
+ private List<Integer> groupByKeys;
+
+ public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
+ this.groupByKeys = new ArrayList<>();
+ for (int i : groupSet.asList()) {
+ if (i != windowFieldIdx) {
+ groupByKeys.add(i);
+ }
+ }
+ }
+
+ @Override
+ public BeamSQLRow apply(BeamSQLRow input) {
+ BeamSQLRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+ BeamSQLRow keyOfRecord = new BeamSQLRow(typeOfKey);
+ keyOfRecord.updateWindowRange(input, null);
+
+ for (int idx = 0; idx < groupByKeys.size(); ++idx) {
+ keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
+ }
+ return keyOfRecord;
+ }
+
+ private BeamSQLRecordType exTypeOfKeyRecord(BeamSQLRecordType dataType) {
+ BeamSQLRecordType typeOfKey = new BeamSQLRecordType();
+ for (int idx : groupByKeys) {
+ typeOfKey.addField(dataType.getFieldsName().get(idx), dataType.getFieldsType().get(idx));
+ }
+ return typeOfKey;
+ }
+
+ }
+
+ /**
+ * Assign event timestamp.
+ */
+ public static class WindowTimestampFn implements SerializableFunction<BeamSQLRow, Instant> {
+ private int windowFieldIdx = -1;
+
+ public WindowTimestampFn(int windowFieldIdx) {
+ super();
+ this.windowFieldIdx = windowFieldIdx;
+ }
+
+ @Override
+ public Instant apply(BeamSQLRow input) {
+ return new Instant(input.getDate(windowFieldIdx).getTime());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
index d018057..79dd67f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
@@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.rel.BeamProjectRel;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
/**
*
@@ -48,10 +49,13 @@ public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
}
@ProcessElement
- public void processElement(ProcessContext c) {
- List<Object> results = executor.execute(c.element());
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ BeamSQLRow inputRecord = c.element();
+ List<Object> results = executor.execute(inputRecord);
BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
+ outRow.updateWindowRange(inputRecord, window);
+
for (int idx = 0; idx < results.size(); ++idx) {
outRow.addField(idx, results.get(idx));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 625fb71..055a4d4 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -18,17 +18,18 @@
package org.apache.beam.dsls.sql.planner;
import java.util.Arrays;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
@@ -36,50 +37,60 @@ import org.junit.BeforeClass;
*
*/
public class BasePlanner {
- public static BeamSqlRunner runner;
+ public static BeamSqlRunner runner = new BeamSqlRunner();
@BeforeClass
public static void prepare() {
- runner = new BeamSqlRunner();
-
runner.addTable("ORDER_DETAILS", getTable());
runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
runner.addTable("SUB_ORDER_RAM", getTable());
}
- @AfterClass
- public static void close(){
- runner = null;
- }
-
private static BaseBeamTable getTable() {
final RelProtoDataType protoRowType = new RelProtoDataType() {
@Override
public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder()
- .add("order_id", SqlTypeName.BIGINT)
- .add("site_id", SqlTypeName.INTEGER)
- .add("price", SqlTypeName.DOUBLE)
- .add("shipping", SqlTypeName.FLOAT)
- .add("notes", SqlTypeName.VARCHAR)
- .build();
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
}
};
- return new MockedBeamSQLTable(protoRowType);
+ BeamSQLRecordType dataType = BeamSQLRecordType.from(
+ protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+ BeamSQLRow row1 = new BeamSQLRow(dataType);
+ row1.addField(0, 12345L);
+ row1.addField(1, 0);
+ row1.addField(2, 10.5);
+ row1.addField(3, new Date());
+
+ BeamSQLRow row2 = new BeamSQLRow(dataType);
+ row2.addField(0, 12345L);
+ row2.addField(1, 1);
+ row2.addField(2, 20.5);
+ row2.addField(3, new Date());
+
+ BeamSQLRow row3 = new BeamSQLRow(dataType);
+ row3.addField(0, 12345L);
+ row3.addField(1, 0);
+ row3.addField(2, 20.5);
+ row3.addField(3, new Date());
+
+ BeamSQLRow row4 = new BeamSQLRow(dataType);
+ row4.addField(0, null);
+ row4.addField(1, null);
+ row4.addField(2, 20.5);
+ row4.addField(3, new Date());
+
+ return new MockedBeamSQLTable(protoRowType).withInputRecords(
+ Arrays.asList(row1, row2, row3, row4));
}
public static BaseBeamTable getTable(String bootstrapServer, String topic) {
final RelProtoDataType protoRowType = new RelProtoDataType() {
@Override
public RelDataType apply(RelDataTypeFactory a0) {
- return a0.builder()
- .add("order_id", SqlTypeName.BIGINT)
- .add("site_id", SqlTypeName.INTEGER)
- .add("price", SqlTypeName.DOUBLE)
- .add("shipping", SqlTypeName.FLOAT)
- .add("notes", SqlTypeName.VARCHAR)
- .build();
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
new file mode 100644
index 0000000..566c574
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.junit.Test;
+
+/**
+ * Test group-by methods.
+ *
+ */
+public class BeamGroupByExplainTest extends BasePlanner {
+
+ /**
+ * GROUP-BY without window operation, and grouped fields.
+ */
+ @Test
+ public void testSimpleGroupExplain() throws Exception {
+ String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 ";
+ String plan = runner.explainQuery(sql);
+ }
+
+ /**
+ * GROUP-BY without window operation, and grouped fields.
+ */
+ @Test
+ public void testSimpleGroup2Explain() throws Exception {
+ String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
+ String plan = runner.explainQuery(sql);
+ }
+
+ /**
+ * GROUP-BY with TUMBLE window.
+ */
+ @Test
+ public void testTumbleExplain() throws Exception {
+ String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+ String plan = runner.explainQuery(sql);
+ }
+
+ /**
+ * GROUP-BY with TUMBLE window.
+ */
+ @Test
+ public void testTumbleWithDelayExplain() throws Exception {
+ String sql = "SELECT order_id, site_id, "
+ + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+ + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+ + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
+ String plan = runner.explainQuery(sql);
+ }
+
+ /**
+ * GROUP-BY with HOP window.
+ */
+ @Test
+ public void testHopExplain() throws Exception {
+ String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
+ String plan = runner.explainQuery(sql);
+ }
+
+ /**
+ * GROUP-BY with SESSION window.
+ */
+ @Test
+ public void testSessionExplain() throws Exception {
+ String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ + ", SESSION(order_time, INTERVAL '5' MINUTE)";
+ String plan = runner.explainQuery(sql);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
new file mode 100644
index 0000000..d5f8125
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.apache.beam.sdk.Pipeline;
+import org.junit.Test;
+
+/**
+ * Test group-by methods.
+ *
+ */
+public class BeamGroupByPipelineTest extends BasePlanner {
+
+ /**
+ * GROUP-BY without window operation, and grouped fields.
+ */
+ @Test
+ public void testSimpleGroupExplain() throws Exception {
+ String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 ";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ }
+
+ /**
+ * GROUP-BY without window operation, and grouped fields.
+ */
+ @Test
+ public void testSimpleGroup2Explain() throws Exception {
+ String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ }
+
+ /**
+ * GROUP-BY with TUMBLE window.
+ */
+ @Test
+ public void testTumbleExplain() throws Exception {
+ String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ }
+
+ /**
+ * GROUP-BY with TUMBLE window.
+ */
+ @Test
+ public void testTumbleWithDelayExplain() throws Exception {
+ String sql = "SELECT order_id, site_id, "
+ + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+ + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+ + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ }
+
+ /**
+ * GROUP-BY with HOP window.
+ */
+ @Test
+ public void testHopExplain() throws Exception {
+ String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ }
+
+ /**
+ * GROUP-BY with SESSION window.
+ */
+ @Test
+ public void testSessionExplain() throws Exception {
+ String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+ + ", SESSION(order_time, INTERVAL '5' MINUTE)";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
new file mode 100644
index 0000000..83ab871
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.apache.calcite.tools.ValidationException;
+import org.junit.Test;
+
+/**
+ * Test group-by methods.
+ *
+ */
+public class BeamInvalidGroupByTest extends BasePlanner {
+
+ @Test(expected = ValidationException.class)
+ public void testTumble2Explain() throws Exception {
+ String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+ String plan = runner.explainQuery(sql);
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testTumble3Explain() throws Exception {
+ String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
+ + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+ + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+ String plan = runner.explainQuery(sql);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
new file mode 100644
index 0000000..39a5614
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests to execute a query.
+ *
+ */
+public class BeamPlannerAggregationSubmitTest {
+ public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ public static BeamSqlRunner runner = new BeamSqlRunner();
+
+ @BeforeClass
+ public static void prepare() throws ParseException {
+ runner.addTable("ORDER_DETAILS", getOrderTable());
+ runner.addTable("ORDER_SUMMARY", getSummaryTable());
+ }
+
+ private static BaseBeamTable getOrderTable() throws ParseException {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT)
+ .add("site_id", SqlTypeName.INTEGER)
+ .add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ BeamSQLRecordType dataType = BeamSQLRecordType.from(
+ protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+ BeamSQLRow row1 = new BeamSQLRow(dataType);
+ row1.addField(0, 12345L);
+ row1.addField(1, 1);
+ row1.addField(2, format.parse("2017-01-01 01:02:03"));
+
+ BeamSQLRow row2 = new BeamSQLRow(dataType);
+ row2.addField(0, 12345L);
+ row2.addField(1, 0);
+ row2.addField(2, format.parse("2017-01-01 01:03:04"));
+
+ BeamSQLRow row3 = new BeamSQLRow(dataType);
+ row3.addField(0, 12345L);
+ row3.addField(1, 0);
+ row3.addField(2, format.parse("2017-01-01 02:03:04"));
+
+ BeamSQLRow row4 = new BeamSQLRow(dataType);
+ row4.addField(0, 2132L);
+ row4.addField(1, 0);
+ row4.addField(2, format.parse("2017-01-01 03:04:05"));
+
+ return new MockedBeamSQLTable(protoRowType).withInputRecords(
+ Arrays.asList(row1
+ , row2, row3, row4
+ ));
+
+ }
+
+ private static BaseBeamTable getSummaryTable() {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder()
+ .add("site_id", SqlTypeName.INTEGER)
+ .add("agg_hour", SqlTypeName.TIMESTAMP)
+ .add("size", SqlTypeName.BIGINT).build();
+ }
+ };
+ return new MockedBeamSQLTable(protoRowType);
+ }
+
+
+ @Test
+ public void selectWithWindowAggregation() throws Exception{
+ String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, agg_hour, SIZE) "
+ + "SELECT site_id, TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+ + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 1 " + "GROUP BY site_id"
+ + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
+
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+
+ pipeline.run().waitUntilFinish();
+
+ Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+ BeamSQLRow result = MockedBeamSQLTable.CONTENT.get(0);
+ Assert.assertEquals(1, result.getInteger(0));
+ Assert.assertEquals(format.parse("2017-01-01 01:00:00"), result.getDate(1));
+ Assert.assertEquals(1L, result.getLong(2));
+ }
+
+ @Test
+ public void selectWithoutWindowAggregation() throws Exception{
+ String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, SIZE) "
+ + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
+
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+
+ pipeline.run().waitUntilFinish();
+
+ Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+ Assert.assertEquals("site_id=0,agg_hour=null,size=3",
+ MockedBeamSQLTable.CONTENT.get(0).valueInString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
index 5d1052b..9b6b20a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -32,7 +32,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
String plan = runner.explainQuery(sql);
String expectedPlan =
- "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], shipping=[$3], notes=[$4])\n"
+ "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n"
+ " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
Assert.assertEquals("explain doesn't match", expectedPlan, plan);
}
@@ -58,12 +58,10 @@ public class BeamPlannerExplainTest extends BasePlanner {
String expectedPlan =
"BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n"
- + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], shipping=[null],"
- + " notes=[null])\n"
+ + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n"
+ " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+ " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+ " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
Assert.assertEquals("explain doesn't match", expectedPlan, plan);
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
index 1ca9eb3..5435049 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.dsls.sql.planner;
+import org.apache.beam.sdk.Pipeline;
import org.junit.Assert;
import org.junit.Test;
@@ -28,13 +29,14 @@ public class BeamPlannerSubmitTest extends BasePlanner {
@Test
public void insertSelectFilter() throws Exception {
String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
- + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
- runner.submitQuery(sql);
+ pipeline.run().waitUntilFinish();
Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
- Assert.assertEquals("order_id=12345,site_id=0,price=20.5,shipping=null,notes=null",
- MockedBeamSQLTable.CONTENT.get(0));
+ Assert.assertTrue(MockedBeamSQLTable.CONTENT.get(0).valueInString()
+ .contains("order_id=12345,site_id=0,price=20.5,order_time="));
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
index 538607f..611bd73 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -19,7 +19,6 @@ package org.apache.beam.dsls.sql.planner;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamIOType;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -38,12 +37,19 @@ import org.apache.calcite.rel.type.RelProtoDataType;
*/
public class MockedBeamSQLTable extends BaseBeamTable {
- public static final List<String> CONTENT = new ArrayList<>();
+ public static final List<BeamSQLRow> CONTENT = new ArrayList<>();
+
+ private List<BeamSQLRow> inputRecords;
public MockedBeamSQLTable(RelProtoDataType protoRowType) {
super(protoRowType);
}
+ public MockedBeamSQLTable withInputRecords(List<BeamSQLRow> inputRecords){
+ this.inputRecords = inputRecords;
+ return this;
+ }
+
@Override
public BeamIOType getSourceType() {
return BeamIOType.UNBOUNDED;
@@ -51,31 +57,7 @@ public class MockedBeamSQLTable extends BaseBeamTable {
@Override
public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
- BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType);
- row1.addField(0, 12345L);
- row1.addField(1, 0);
- row1.addField(2, 10.5);
- row1.addField(3, 123.4f);
-
- BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
- row2.addField(0, 12345L);
- row2.addField(1, 1);
- row2.addField(2, 20.5);
- row2.addField(3, 234.5f);
-
- BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
- row3.addField(0, 12345L);
- row3.addField(1, 0);
- row3.addField(2, 20.5);
- row3.addField(3, 345.6f);
-
- BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
- row4.addField(0, null);
- row4.addField(1, null);
- row4.addField(2, 20.5);
- row4.addField(3, 456.7f);
-
- return Create.of(row1, row2, row3);
+ return Create.of(inputRecords);
}
@Override
@@ -100,7 +82,7 @@ public class MockedBeamSQLTable extends BaseBeamTable {
@ProcessElement
public void processElement(ProcessContext c) {
- CONTENT.add(c.element().valueInString());
+ CONTENT.add(c.element());
}
@Teardown
[2/2] beam git commit: This closes #3039
Posted by dh...@apache.org.
This closes #3039
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6729a027
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6729a027
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6729a027
Branch: refs/heads/DSL_SQL
Commit: 6729a027daea04d0ad38f8a83ad1baafa465169f
Parents: 7e918a7 3c6d60f
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 10 10:03:17 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 10 10:03:17 2017 -0700
----------------------------------------------------------------------
dsls/sql/pom.xml | 6 +-
.../dsls/sql/interpreter/BeamSQLFnExecutor.java | 20 ++-
.../interpreter/operator/BeamSqlPrimitive.java | 10 ++
.../operator/BeamSqlWindowEndExpression.java | 42 +++++
.../operator/BeamSqlWindowExpression.java | 50 ++++++
.../operator/BeamSqlWindowStartExpression.java | 43 +++++
.../dsls/sql/planner/BeamPipelineCreator.java | 17 +-
.../beam/dsls/sql/planner/BeamRuleSets.java | 4 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 154 ++++++++++++++++++
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 4 +-
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 2 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 2 +-
.../beam/dsls/sql/rel/BeamProjectRel.java | 4 +-
.../beam/dsls/sql/rule/BeamAggregationRule.java | 163 +++++++++++++++++++
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 5 +
.../apache/beam/dsls/sql/schema/BeamSQLRow.java | 64 +++++++-
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 21 ++-
.../sql/transform/BeamAggregationTransform.java | 120 ++++++++++++++
.../dsls/sql/transform/BeamSQLProjectFn.java | 8 +-
.../beam/dsls/sql/planner/BasePlanner.java | 61 ++++---
.../sql/planner/BeamGroupByExplainTest.java | 93 +++++++++++
.../sql/planner/BeamGroupByPipelineTest.java | 94 +++++++++++
.../sql/planner/BeamInvalidGroupByTest.java | 44 +++++
.../BeamPlannerAggregationSubmitTest.java | 136 ++++++++++++++++
.../sql/planner/BeamPlannerExplainTest.java | 6 +-
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 12 +-
.../dsls/sql/planner/MockedBeamSQLTable.java | 38 ++---
27 files changed, 1137 insertions(+), 86 deletions(-)
----------------------------------------------------------------------