You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/10 17:03:26 UTC

[1/2] beam git commit: [BEAM-2006] window support Add support for aggregation: global, HOP, TUMBLE, SESSION, only aggregation function COUNT

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 7e918a76e -> 6729a027d


[BEAM-2006] window support
Add support for aggregation: global, HOP, TUMBLE, SESSION, only aggregation function COUNT

fix typo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c6d60fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c6d60fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c6d60fb

Branch: refs/heads/DSL_SQL
Commit: 3c6d60fb2f473b1860cfeca0f740ec817499d085
Parents: 7e918a7
Author: mingmxu <mi...@ebay.com>
Authored: Tue May 9 23:04:17 2017 -0700
Committer: mingmxu <mi...@ebay.com>
Committed: Tue May 9 23:15:58 2017 -0700

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |   6 +-
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java |  20 ++-
 .../interpreter/operator/BeamSqlPrimitive.java  |  10 ++
 .../operator/BeamSqlWindowEndExpression.java    |  42 +++++
 .../operator/BeamSqlWindowExpression.java       |  50 ++++++
 .../operator/BeamSqlWindowStartExpression.java  |  43 +++++
 .../dsls/sql/planner/BeamPipelineCreator.java   |  17 +-
 .../beam/dsls/sql/planner/BeamRuleSets.java     |   4 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 154 ++++++++++++++++++
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |   4 +-
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java |   2 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |   2 +-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |   4 +-
 .../beam/dsls/sql/rule/BeamAggregationRule.java | 163 +++++++++++++++++++
 .../beam/dsls/sql/schema/BeamSQLRecordType.java |   5 +
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java |  64 +++++++-
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  21 ++-
 .../sql/transform/BeamAggregationTransform.java | 120 ++++++++++++++
 .../dsls/sql/transform/BeamSQLProjectFn.java    |   8 +-
 .../beam/dsls/sql/planner/BasePlanner.java      |  61 ++++---
 .../sql/planner/BeamGroupByExplainTest.java     |  93 +++++++++++
 .../sql/planner/BeamGroupByPipelineTest.java    |  94 +++++++++++
 .../sql/planner/BeamInvalidGroupByTest.java     |  44 +++++
 .../BeamPlannerAggregationSubmitTest.java       | 136 ++++++++++++++++
 .../sql/planner/BeamPlannerExplainTest.java     |   6 +-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |  12 +-
 .../dsls/sql/planner/MockedBeamSQLTable.java    |  38 ++---
 27 files changed, 1137 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index 15692e9..bc658e6 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -34,7 +34,7 @@
   <properties>
     <timestamp>${maven.build.timestamp}</timestamp>
     <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
-    <calcite-version>1.11.0</calcite-version>
+    <calcite-version>1.12.0</calcite-version>
   </properties>
 
   <build>
@@ -209,5 +209,9 @@
       <artifactId>commons-csv</artifactId>
       <version>1.4</version>
     </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
index a14d347..4ae7b33 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -34,6 +34,9 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
@@ -159,8 +162,21 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
 
         case "IS NULL":
           return new BeamSqlIsNullExpression(subExps.get(0));
-        case "IS NOT NULL":
-          return new BeamSqlIsNotNullExpression(subExps.get(0));
+      case "IS NOT NULL":
+        return new BeamSqlIsNotNullExpression(subExps.get(0));
+
+      case "HOP":
+      case "TUMBLE":
+      case "SESSION":
+        return new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+      case "HOP_START":
+      case "TUMBLE_START":
+      case "SESSION_START":
+        return new BeamSqlWindowStartExpression();
+      case "HOP_END":
+      case "TUMBLE_END":
+      case "SESSION_END":
+        return new BeamSqlWindowEndExpression();
       default:
         throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index dbe6c3c..3309577 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -18,6 +18,8 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
 import java.util.List;
 import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
@@ -89,6 +91,14 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression{
     case CHAR:
     case VARCHAR:
       return value instanceof String || value instanceof NlsString;
+    case TIME:
+      return value instanceof GregorianCalendar;
+    case TIMESTAMP:
+      return value instanceof Date;
+    case INTERVAL_HOUR:
+      return value instanceof BigDecimal;
+    case INTERVAL_MINUTE:
+      return value instanceof BigDecimal;
     default:
       throw new BeamSqlUnsupportedException(outputType.name());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
new file mode 100644
index 0000000..96ad81f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
+ *
+ * <p>These operators returns the <em>end</em> timestamp of window.
+ */
+public class BeamSqlWindowEndExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRecord.getWindowEnd().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
new file mode 100644
index 0000000..2fb9a48
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
+ *
+ * <p>These functions don't change the timestamp field, instead it's used to indicate
+ * the event_timestamp field, and how the window is defined.
+ */
+public class BeamSqlWindowExpression extends BeamSqlExpression {
+
+  public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override
+  public boolean accept() {
+    return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        (Date) operands.get(0).evaluate(inputRecord).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
new file mode 100644
index 0000000..d0ac260
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
+ * {@code SESSION_START} operation.
+ *
+ * <p>These operators returns the <em>start</em> timestamp of window.
+ */
+public class BeamSqlWindowStartExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRecord.getWindowStart().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
index 00274a2..1d7cfd1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
@@ -19,6 +19,9 @@ package org.apache.beam.dsls.sql.planner;
 
 import java.util.Map;
 
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
@@ -38,7 +41,7 @@ import org.apache.beam.sdk.values.PCollection;
  */
 public class BeamPipelineCreator {
   private Map<String, BaseBeamTable> sourceTables;
-  private PCollection<BeamSQLRow> latestStream;
+  private Queue<PCollection<BeamSQLRow>> upStreamQueue;
 
   private PipelineOptions options;
 
@@ -53,18 +56,20 @@ public class BeamPipelineCreator {
         .as(PipelineOptions.class); // FlinkPipelineOptions.class
     options.setJobName("BeamPlanCreator");
 
+    upStreamQueue = new ConcurrentLinkedQueue<>();
+
     pipeline = Pipeline.create(options);
     CoderRegistry cr = pipeline.getCoderRegistry();
     cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
     cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
   }
 
-  public PCollection<BeamSQLRow> getLatestStream() {
-    return latestStream;
+  public PCollection<BeamSQLRow> popUpstream() {
+    return upStreamQueue.poll();
   }
 
-  public void setLatestStream(PCollection<BeamSQLRow> latestStream) {
-    this.latestStream = latestStream;
+  public void pushUpstream(PCollection<BeamSQLRow> upstream) {
+    this.upStreamQueue.add(upstream);
   }
 
   public Map<String, BaseBeamTable> getSourceTables() {
@@ -75,7 +80,7 @@ public class BeamPipelineCreator {
     return pipeline;
   }
 
-  public boolean isHasPersistent() {
+  public boolean hasPersistent() {
     return hasPersistent;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index 2af31dc..acbd43f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Iterator;
 
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
 import org.apache.beam.dsls.sql.rule.BeamFilterRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
@@ -38,7 +39,8 @@ import org.apache.calcite.tools.RuleSet;
 public class BeamRuleSets {
   private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
       .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
-          BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE)
+          BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
+          BeamAggregationRule.INSTANCE)
       .build();
 
   public static RuleSet[] getRuleSets() {

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
new file mode 100644
index 0000000..2c7626d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.transform.BeamAggregationTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.joda.time.Duration;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Aggregate} node.
+ *
+ */
+public class BeamAggregationRel extends Aggregate implements BeamRelNode {
+  private int windowFieldIdx = -1;
+  private WindowFn<BeamSQLRow, BoundedWindow> windowFn;
+  private Trigger trigger;
+  private Duration allowedLatence = Duration.ZERO;
+
+  public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
+      , RelNode child, boolean indicator,
+      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls
+      , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
+    super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+    this.windowFn = windowFn;
+    this.trigger = trigger;
+    this.windowFieldIdx = windowFieldIdx;
+    this.allowedLatence = allowedLatence;
+  }
+
+  @Override
+  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+    RelNode input = getInput();
+    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+
+    String stageName = BeamSQLRelUtils.getStageName(this);
+
+    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
+    if (windowFieldIdx != -1) {
+      upstream = upstream.apply("assignEventTimestamp", WithTimestamps
+          .<BeamSQLRow>of(new BeamAggregationTransform.WindowTimestampFn(windowFieldIdx)));
+    }
+
+    PCollection<BeamSQLRow> windowStream = upstream.apply("window",
+        Window.<BeamSQLRow>into(windowFn)
+        .triggering(trigger)
+        .withAllowedLateness(allowedLatence)
+        .accumulatingFiredPanes());
+
+    PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = windowStream.apply("exGroupBy",
+        WithKeys
+            .of(new BeamAggregationTransform.AggregationGroupByKeyFn(windowFieldIdx, groupSet)));
+
+    PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream
+        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create());
+
+    if (aggCalls.size() > 1) {
+      throw new BeamSqlUnsupportedException("only single aggregation is supported now.");
+    }
+
+    AggregateCall aggCall = aggCalls.get(0);
+    switch (aggCall.getAggregation().getName()) {
+    case "COUNT":
+      PCollection<KV<BeamSQLRow, Long>> aggregatedStream = groupedStream.apply("count",
+          Combine.<BeamSQLRow, BeamSQLRow, Long>groupedValues(Count.combineFn()));
+      PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord",
+          ParDo.of(new BeamAggregationTransform.MergeAggregationRecord(
+              BeamSQLRecordType.from(getRowType()), aggCall.getName())));
+      planCreator.pushUpstream(mergedStream);
+      break;
+    default:
+      //Only support COUNT now, more are added in BEAM-2008
+      throw new BeamSqlUnsupportedException(
+          String.format("Unsupported aggregation [%s]", aggCall.getAggregation().getName()));
+    }
+
+    return planCreator.getPipeline();
+  }
+
+  @Override
+  public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
+      , ImmutableBitSet groupSet,
+      List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    return new BeamAggregationRel(getCluster(), traitSet, input, indicator
+        , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence);
+  }
+
+  public void setWindowFn(WindowFn windowFn) {
+    this.windowFn = windowFn;
+  }
+
+  public void setTrigger(Trigger trigger) {
+    this.trigger = trigger;
+  }
+
+  public RelWriter explainTerms(RelWriter pw) {
+    // We skip the "groups" element if it is a singleton of "group".
+    pw.item("group", groupSet)
+        .itemIf("window", windowFn, windowFn != null)
+        .itemIf("trigger", trigger, trigger != null)
+        .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
+        .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
+        .itemIf("indicator", indicator, indicator)
+        .itemIf("aggs", aggCalls, pw.nest());
+    if (!pw.nest()) {
+      for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
+        pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
+      }
+    }
+    return pw;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 477be5a..e1c5b3e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -56,14 +56,14 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
 
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
 
     BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
     PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
         ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
 
-    planCreator.setLatestStream(projectStream);
+    planCreator.pushUpstream(projectStream);
 
     return planCreator.getPipeline();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index cad0b3c..f38b9e1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -60,7 +60,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
 
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index 6b1b6cd..3538273 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -52,7 +52,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
     PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
         sourceTable.buildIOReader());
 
-    planCreator.setLatestStream(sourceStream);
+    planCreator.pushUpstream(sourceStream);
 
     return planCreator.getPipeline();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 7e27ab3..65f5b20 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -67,14 +67,14 @@ public class BeamProjectRel extends Project implements BeamRelNode {
 
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
 
     BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
     PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
         .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
 
-    planCreator.setLatestStream(projectStream);
+    planCreator.pushUpstream(projectStream);
 
     return planCreator.getPipeline();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
new file mode 100644
index 0000000..249d02d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.InvalidFieldException;
+import org.apache.beam.dsls.sql.rel.BeamAggregationRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Duration;
+
+/**
+ * Rule to detect the window/trigger settings.
+ *
+ */
+public class BeamAggregationRule extends RelOptRule {
+  public static final BeamAggregationRule INSTANCE =
+      new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+  public BeamAggregationRule(
+      Class<? extends Aggregate> aggregateClass,
+      Class<? extends Project> projectClass,
+      RelBuilderFactory relBuilderFactory) {
+    super(
+        operand(aggregateClass,
+            operand(projectClass, any())),
+        relBuilderFactory, null);
+  }
+
+  public BeamAggregationRule(RelOptRuleOperand operand, String description) {
+    super(operand, description);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+    final Project project = call.rel(1);
+    updateWindowTrigger(call, aggregate, project);
+  }
+
+  private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
+      Project project) {
+    ImmutableBitSet groupByFields = aggregate.getGroupSet();
+    List<RexNode> projectMapping = project.getProjects();
+
+    WindowFn windowFn = new GlobalWindows();
+    Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
+    int windowFieldIdx = -1;
+    Duration allowedLatence = Duration.ZERO;
+
+    for (int groupField : groupByFields.asList()) {
+      RexNode projNode = projectMapping.get(groupField);
+      if (projNode instanceof RexCall) {
+        SqlOperator op = ((RexCall) projNode).op;
+        ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+        String functionName = op.getName();
+        switch (functionName) {
+        case "TUMBLE":
+          windowFieldIdx = groupField;
+          windowFn = FixedWindows
+              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+          if (parameters.size() == 3) {
+            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+                .getValue();
+            triggerFn = createTriggerWithDelay(delayTime);
+            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+          }
+          break;
+        case "HOP":
+          windowFieldIdx = groupField;
+          windowFn = SlidingWindows
+              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
+              .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
+          if (parameters.size() == 4) {
+            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+                .getValue();
+            triggerFn = createTriggerWithDelay(delayTime);
+            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+          }
+          break;
+        case "SESSION":
+          windowFieldIdx = groupField;
+          windowFn = Sessions
+              .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+          if (parameters.size() == 3) {
+            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+                .getValue();
+            triggerFn = createTriggerWithDelay(delayTime);
+            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+          }
+          break;
+        default:
+          break;
+        }
+      }
+    }
+
+    BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
+        aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(aggregate.getInput(),
+            aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        aggregate.indicator,
+        aggregate.getGroupSet(),
+        aggregate.getGroupSets(),
+        aggregate.getAggCallList(),
+        windowFn,
+        triggerFn,
+        windowFieldIdx,
+        allowedLatence);
+    call.transformTo(newAggregator);
+  }
+
+  private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+    return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
+        .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
+  }
+
+  private long getWindowParameterAsMillis(RexNode parameterNode) {
+    if (parameterNode instanceof RexLiteral) {
+      return RexLiteral.intValue(parameterNode);
+    } else {
+      throw new InvalidFieldException(String.format("[%s] is not valid.", parameterNode));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
index e4013bc..94531f0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -42,6 +42,11 @@ public class BeamSQLRecordType implements Serializable {
     return record;
   }
 
+  public void addField(String fieldName, SqlTypeName fieldType) {
+    fieldsName.add(fieldName);
+    fieldsType.add(fieldType);
+  }
+
   public int size() {
     return fieldsName.size();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index f9dab8a..65f4a41 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -20,9 +20,14 @@ package org.apache.beam.dsls.sql.schema;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.GregorianCalendar;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.dsls.sql.exception.InvalidFieldException;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Instant;
 
 /**
  * Repersent a generic ROW record in Beam SQL.
@@ -34,6 +39,9 @@ public class BeamSQLRow implements Serializable {
   private List<Object> dataValues;
   private BeamSQLRecordType dataType;
 
+  private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+  private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+
   public BeamSQLRow(BeamSQLRecordType dataType) {
     this.dataType = dataType;
     this.dataValues = new ArrayList<>();
@@ -50,6 +58,17 @@ public class BeamSQLRow implements Serializable {
     }
   }
 
+  public void updateWindowRange(BeamSQLRow upstreamRecord, BoundedWindow window){
+    windowStart = upstreamRecord.windowStart;
+    windowEnd = upstreamRecord.windowEnd;
+
+    if (window instanceof IntervalWindow) {
+      IntervalWindow iWindow = (IntervalWindow) window;
+      windowStart = iWindow.start();
+      windowEnd = iWindow.end();
+    }
+  }
+
   public void addField(String fieldName, Object fieldValue) {
     addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
   }
@@ -107,6 +126,18 @@ public class BeamSQLRow implements Serializable {
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
       }
       break;
+    case TIME:
+      if (!(fieldValue instanceof GregorianCalendar)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      }
+      break;
+    case TIMESTAMP:
+      if (!(fieldValue instanceof Date)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      }
+      break;
     default:
       throw new UnsupportedDataTypeException(fieldType);
     }
@@ -203,6 +234,20 @@ public class BeamSQLRow implements Serializable {
       } else {
         return fieldValue;
       }
+    case TIME:
+      if (!(fieldValue instanceof GregorianCalendar)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      } else {
+        return fieldValue;
+      }
+    case TIMESTAMP:
+      if (!(fieldValue instanceof Date)) {
+        throw new InvalidFieldException(
+            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+      } else {
+        return fieldValue;
+      }
     default:
       throw new UnsupportedDataTypeException(fieldType);
     }
@@ -236,9 +281,26 @@ public class BeamSQLRow implements Serializable {
     return nullFields;
   }
 
+  public Instant getWindowStart() {
+    return windowStart;
+  }
+
+  public Instant getWindowEnd() {
+    return windowEnd;
+  }
+
+  public void setWindowStart(Instant windowStart) {
+    this.windowStart = windowStart;
+  }
+
+  public void setWindowEnd(Instant windowEnd) {
+    this.windowEnd = windowEnd;
+  }
+
   @Override
   public String toString() {
-    return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
+    return "BeamSQLRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
+        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 9b2474a..3100ba5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -20,12 +20,14 @@ package org.apache.beam.dsls.sql.schema;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -43,6 +45,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
   private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
   private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
   private static final DoubleCoder doubleCoder = DoubleCoder.of();
+  private static final InstantCoder instantCoder = InstantCoder.of();
 
   private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
   private BeamSqlRowCoder(){}
@@ -74,8 +77,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
           doubleCoder.encode(value.getDouble(idx), outStream, context.nested());
           break;
         case FLOAT:
-          doubleCoder.encode(Double.parseDouble(
-              String.valueOf(value.getFloat(idx))), outStream, context.nested());
+          doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested());
           break;
         case BIGINT:
           longCoder.encode(value.getLong(idx), outStream, context.nested());
@@ -83,13 +85,17 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
         case VARCHAR:
           stringCoder.encode(value.getString(idx), outStream, context.nested());
           break;
+        case TIMESTAMP:
+          longCoder.encode(value.getDate(idx).getTime(), outStream, context);
+          break;
 
         default:
           throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
       }
     }
-    //add a dummy field to indicate the end of record
-    intCoder.encode(value.size(), outStream, context);
+
+    instantCoder.encode(value.getWindowStart(), outStream, context.nested());
+    instantCoder.encode(value.getWindowEnd(), outStream, context);
   }
 
   @Override
@@ -128,12 +134,17 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
         case VARCHAR:
           record.addField(idx, stringCoder.decode(inStream, context.nested()));
           break;
+        case TIMESTAMP:
+          record.addField(idx, new Date(longCoder.decode(inStream, context)));
+          break;
 
         default:
           throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
       }
     }
-    intCoder.decode(inStream, context);
+
+    record.setWindowStart(instantCoder.decode(inStream, context.nested()));
+    record.setWindowEnd(instantCoder.decode(inStream, context));
 
     return record;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java
new file mode 100644
index 0000000..f478363
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransform.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Instant;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation.
+ */
+public class BeamAggregationTransform implements Serializable{
+  /**
+   * Merge KV to single record.
+   */
+  public static class MergeAggregationRecord extends DoFn<KV<BeamSQLRow, Long>, BeamSQLRow> {
+    private BeamSQLRecordType outRecordType;
+    private String aggFieldName;
+
+    public MergeAggregationRecord(BeamSQLRecordType outRecordType, String aggFieldName) {
+      this.outRecordType = outRecordType;
+      this.aggFieldName = aggFieldName;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      BeamSQLRow outRecord = new BeamSQLRow(outRecordType);
+      outRecord.updateWindowRange(c.element().getKey(), window);
+
+      KV<BeamSQLRow, Long> kvRecord = c.element();
+      for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
+        outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
+      }
+      outRecord.addField(aggFieldName, kvRecord.getValue());
+
+//      if (c.pane().isLast()) {
+      c.output(outRecord);
+//      }
+    }
+  }
+
+  /**
+   * extract group-by fields.
+   */
+  public static class AggregationGroupByKeyFn
+      implements SerializableFunction<BeamSQLRow, BeamSQLRow> {
+    private List<Integer> groupByKeys;
+
+    public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
+      this.groupByKeys = new ArrayList<>();
+      for (int i : groupSet.asList()) {
+        if (i != windowFieldIdx) {
+          groupByKeys.add(i);
+        }
+      }
+    }
+
+    @Override
+    public BeamSQLRow apply(BeamSQLRow input) {
+      BeamSQLRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+      BeamSQLRow keyOfRecord = new BeamSQLRow(typeOfKey);
+      keyOfRecord.updateWindowRange(input, null);
+
+      for (int idx = 0; idx < groupByKeys.size(); ++idx) {
+        keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
+      }
+      return keyOfRecord;
+    }
+
+    private BeamSQLRecordType exTypeOfKeyRecord(BeamSQLRecordType dataType) {
+      BeamSQLRecordType typeOfKey = new BeamSQLRecordType();
+      for (int idx : groupByKeys) {
+        typeOfKey.addField(dataType.getFieldsName().get(idx), dataType.getFieldsType().get(idx));
+      }
+      return typeOfKey;
+    }
+
+  }
+
+  /**
+   * Assign event timestamp.
+   */
+  public static class WindowTimestampFn implements SerializableFunction<BeamSQLRow, Instant> {
+    private int windowFieldIdx = -1;
+
+    public WindowTimestampFn(int windowFieldIdx) {
+      super();
+      this.windowFieldIdx = windowFieldIdx;
+    }
+
+    @Override
+    public Instant apply(BeamSQLRow input) {
+      return new Instant(input.getDate(windowFieldIdx).getTime());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
index d018057..79dd67f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
@@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
 /**
  *
@@ -48,10 +49,13 @@ public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
   }
 
   @ProcessElement
-  public void processElement(ProcessContext c) {
-    List<Object> results = executor.execute(c.element());
+  public void processElement(ProcessContext c, BoundedWindow window) {
+    BeamSQLRow inputRecord = c.element();
+    List<Object> results = executor.execute(inputRecord);
 
     BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
+    outRow.updateWindowRange(inputRecord, window);
+
     for (int idx = 0; idx < results.size(); ++idx) {
       outRow.addField(idx, results.get(idx));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 625fb71..055a4d4 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -18,17 +18,18 @@
 package org.apache.beam.dsls.sql.planner;
 
 import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 /**
@@ -36,50 +37,60 @@ import org.junit.BeforeClass;
  *
  */
 public class BasePlanner {
-  public static BeamSqlRunner runner;
+  public static BeamSqlRunner runner = new BeamSqlRunner();
 
   @BeforeClass
   public static void prepare() {
-    runner = new BeamSqlRunner();
-
     runner.addTable("ORDER_DETAILS", getTable());
     runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
     runner.addTable("SUB_ORDER_RAM", getTable());
   }
 
-  @AfterClass
-  public static void close(){
-    runner = null;
-  }
-
   private static BaseBeamTable getTable() {
     final RelProtoDataType protoRowType = new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder()
-            .add("order_id", SqlTypeName.BIGINT)
-            .add("site_id", SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE)
-            .add("shipping", SqlTypeName.FLOAT)
-            .add("notes", SqlTypeName.VARCHAR)
-            .build();
+        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
       }
     };
 
-    return new MockedBeamSQLTable(protoRowType);
+    BeamSQLRecordType dataType = BeamSQLRecordType.from(
+        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+    BeamSQLRow row1 = new BeamSQLRow(dataType);
+    row1.addField(0, 12345L);
+    row1.addField(1, 0);
+    row1.addField(2, 10.5);
+    row1.addField(3, new Date());
+
+    BeamSQLRow row2 = new BeamSQLRow(dataType);
+    row2.addField(0, 12345L);
+    row2.addField(1, 1);
+    row2.addField(2, 20.5);
+    row2.addField(3, new Date());
+
+    BeamSQLRow row3 = new BeamSQLRow(dataType);
+    row3.addField(0, 12345L);
+    row3.addField(1, 0);
+    row3.addField(2, 20.5);
+    row3.addField(3, new Date());
+
+    BeamSQLRow row4 = new BeamSQLRow(dataType);
+    row4.addField(0, null);
+    row4.addField(1, null);
+    row4.addField(2, 20.5);
+    row4.addField(3, new Date());
+
+    return new MockedBeamSQLTable(protoRowType).withInputRecords(
+        Arrays.asList(row1, row2, row3, row4));
   }
 
   public static BaseBeamTable getTable(String bootstrapServer, String topic) {
     final RelProtoDataType protoRowType = new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder()
-            .add("order_id", SqlTypeName.BIGINT)
-            .add("site_id", SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE)
-            .add("shipping", SqlTypeName.FLOAT)
-            .add("notes", SqlTypeName.VARCHAR)
-            .build();
+        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
       }
     };
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
new file mode 100644
index 0000000..566c574
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.junit.Test;
+
+/**
+ * Test group-by methods.
+ *
+ */
+public class BeamGroupByExplainTest extends BasePlanner {
+
+  /**
+   * GROUP-BY without window operation, and grouped fields.
+   */
+  @Test
+  public void testSimpleGroupExplain() throws Exception {
+    String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 ";
+    String plan = runner.explainQuery(sql);
+  }
+
+  /**
+   * GROUP-BY without window operation, and grouped fields.
+   */
+  @Test
+  public void testSimpleGroup2Explain() throws Exception {
+    String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
+    String plan = runner.explainQuery(sql);
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window.
+   */
+  @Test
+  public void testTumbleExplain() throws Exception {
+    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+    String plan = runner.explainQuery(sql);
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window.
+   */
+  @Test
+  public void testTumbleWithDelayExplain() throws Exception {
+    String sql = "SELECT order_id, site_id, "
+        + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+        + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
+    String plan = runner.explainQuery(sql);
+  }
+
+  /**
+   * GROUP-BY with HOP window.
+   */
+  @Test
+  public void testHopExplain() throws Exception {
+    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+        + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
+    String plan = runner.explainQuery(sql);
+  }
+
+  /**
+   * GROUP-BY with SESSION window.
+   */
+  @Test
+  public void testSessionExplain() throws Exception {
+    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+        + ", SESSION(order_time, INTERVAL '5' MINUTE)";
+    String plan = runner.explainQuery(sql);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
new file mode 100644
index 0000000..d5f8125
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.apache.beam.sdk.Pipeline;
+import org.junit.Test;
+
+/**
+ * Test group-by methods.
+ *
+ */
+public class BeamGroupByPipelineTest extends BasePlanner {
+
+  /**
+   * GROUP-BY without window operation, and grouped fields.
+   */
+  @Test
+  public void testSimpleGroupExplain() throws Exception {
+    String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 ";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+  }
+
+  /**
+   * GROUP-BY without window operation, and grouped fields.
+   */
+  @Test
+  public void testSimpleGroup2Explain() throws Exception {
+    String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window.
+   */
+  @Test
+  public void testTumbleExplain() throws Exception {
+    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+  }
+
+  /**
+   * GROUP-BY with TUMBLE window.
+   */
+  @Test
+  public void testTumbleWithDelayExplain() throws Exception {
+    String sql = "SELECT order_id, site_id, "
+        + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+        + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+  }
+
+  /**
+   * GROUP-BY with HOP window.
+   */
+  @Test
+  public void testHopExplain() throws Exception {
+    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+        + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+  }
+
+  /**
+   * GROUP-BY with SESSION window.
+   */
+  @Test
+  public void testSessionExplain() throws Exception {
+    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
+        + ", SESSION(order_time, INTERVAL '5' MINUTE)";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
new file mode 100644
index 0000000..83ab871
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.apache.calcite.tools.ValidationException;
+import org.junit.Test;
+
+/**
+ * Test group-by methods.
+ *
+ */
+public class BeamInvalidGroupByTest extends BasePlanner {
+
+  @Test(expected = ValidationException.class)
+  public void testTumble2Explain() throws Exception {
+    String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+    String plan = runner.explainQuery(sql);
+  }
+
+  @Test(expected = ValidationException.class)
+  public void testTumble3Explain() throws Exception {
+    String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
+        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
+        + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
+    String plan = runner.explainQuery(sql);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
new file mode 100644
index 0000000..39a5614
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests to execute a query.
+ *
+ */
+public class BeamPlannerAggregationSubmitTest {
+  public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+  public static BeamSqlRunner runner = new BeamSqlRunner();
+
+  @BeforeClass
+  public static void prepare() throws ParseException {
+    runner.addTable("ORDER_DETAILS", getOrderTable());
+    runner.addTable("ORDER_SUMMARY", getSummaryTable());
+  }
+
+  private static BaseBeamTable getOrderTable() throws ParseException {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("order_id", SqlTypeName.BIGINT)
+            .add("site_id", SqlTypeName.INTEGER)
+            .add("order_time", SqlTypeName.TIMESTAMP).build();
+      }
+    };
+
+    BeamSQLRecordType dataType = BeamSQLRecordType.from(
+        protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
+    BeamSQLRow row1 = new BeamSQLRow(dataType);
+    row1.addField(0, 12345L);
+    row1.addField(1, 1);
+    row1.addField(2, format.parse("2017-01-01 01:02:03"));
+
+    BeamSQLRow row2 = new BeamSQLRow(dataType);
+    row2.addField(0, 12345L);
+    row2.addField(1, 0);
+    row2.addField(2, format.parse("2017-01-01 01:03:04"));
+
+    BeamSQLRow row3 = new BeamSQLRow(dataType);
+    row3.addField(0, 12345L);
+    row3.addField(1, 0);
+    row3.addField(2, format.parse("2017-01-01 02:03:04"));
+
+    BeamSQLRow row4 = new BeamSQLRow(dataType);
+    row4.addField(0, 2132L);
+    row4.addField(1, 0);
+    row4.addField(2, format.parse("2017-01-01 03:04:05"));
+
+    return new MockedBeamSQLTable(protoRowType).withInputRecords(
+        Arrays.asList(row1
+            , row2, row3, row4
+            ));
+
+  }
+
+  private static BaseBeamTable getSummaryTable() {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+            .add("site_id", SqlTypeName.INTEGER)
+            .add("agg_hour", SqlTypeName.TIMESTAMP)
+            .add("size", SqlTypeName.BIGINT).build();
+      }
+    };
+    return new MockedBeamSQLTable(protoRowType);
+  }
+
+
+  @Test
+  public void selectWithWindowAggregation() throws Exception{
+    String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, agg_hour, SIZE) "
+        + "SELECT site_id, TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
+        + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 1 " + "GROUP BY site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
+
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+
+    pipeline.run().waitUntilFinish();
+
+    Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+    BeamSQLRow result = MockedBeamSQLTable.CONTENT.get(0);
+    Assert.assertEquals(1, result.getInteger(0));
+    Assert.assertEquals(format.parse("2017-01-01 01:00:00"), result.getDate(1));
+    Assert.assertEquals(1L, result.getLong(2));
+  }
+
+  @Test
+  public void selectWithoutWindowAggregation() throws Exception{
+    String sql = "INSERT INTO ORDER_SUMMARY(SITE_ID, SIZE) "
+        + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
+        + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
+
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+
+    pipeline.run().waitUntilFinish();
+
+    Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+    Assert.assertEquals("site_id=0,agg_hour=null,size=3",
+        MockedBeamSQLTable.CONTENT.get(0).valueInString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
index 5d1052b..9b6b20a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -32,7 +32,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
     String plan = runner.explainQuery(sql);
 
     String expectedPlan =
-        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], shipping=[$3], notes=[$4])\n"
+        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n"
         + "  BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
     Assert.assertEquals("explain doesn't match", expectedPlan, plan);
   }
@@ -58,12 +58,10 @@ public class BeamPlannerExplainTest extends BasePlanner {
 
     String expectedPlan =
         "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n"
-        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], shipping=[null],"
-            + " notes=[null])\n"
+        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n"
         + "    BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
         + "      BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
         + "        BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
     Assert.assertEquals("explain doesn't match", expectedPlan, plan);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
index 1ca9eb3..5435049 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.sdk.Pipeline;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,13 +29,14 @@ public class BeamPlannerSubmitTest extends BasePlanner {
   @Test
   public void insertSelectFilter() throws Exception {
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
-        + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
 
-    runner.submitQuery(sql);
+    pipeline.run().waitUntilFinish();
 
     Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
-    Assert.assertEquals("order_id=12345,site_id=0,price=20.5,shipping=null,notes=null",
-        MockedBeamSQLTable.CONTENT.get(0));
+    Assert.assertTrue(MockedBeamSQLTable.CONTENT.get(0).valueInString()
+        .contains("order_id=12345,site_id=0,price=20.5,order_time="));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3c6d60fb/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
index 538607f..611bd73 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -19,7 +19,6 @@ package org.apache.beam.dsls.sql.planner;
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -38,12 +37,19 @@ import org.apache.calcite.rel.type.RelProtoDataType;
  */
 public class MockedBeamSQLTable extends BaseBeamTable {
 
-  public static final List<String> CONTENT = new ArrayList<>();
+  public static final List<BeamSQLRow> CONTENT = new ArrayList<>();
+
+  private List<BeamSQLRow> inputRecords;
 
   public MockedBeamSQLTable(RelProtoDataType protoRowType) {
     super(protoRowType);
   }
 
+  public MockedBeamSQLTable withInputRecords(List<BeamSQLRow> inputRecords){
+    this.inputRecords = inputRecords;
+    return this;
+  }
+
   @Override
   public BeamIOType getSourceType() {
     return BeamIOType.UNBOUNDED;
@@ -51,31 +57,7 @@ public class MockedBeamSQLTable extends BaseBeamTable {
 
   @Override
   public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
-    BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType);
-    row1.addField(0, 12345L);
-    row1.addField(1, 0);
-    row1.addField(2, 10.5);
-    row1.addField(3, 123.4f);
-
-    BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
-    row2.addField(0, 12345L);
-    row2.addField(1, 1);
-    row2.addField(2, 20.5);
-    row2.addField(3, 234.5f);
-
-    BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
-    row3.addField(0, 12345L);
-    row3.addField(1, 0);
-    row3.addField(2, 20.5);
-    row3.addField(3, 345.6f);
-
-    BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
-    row4.addField(0, null);
-    row4.addField(1, null);
-    row4.addField(2, 20.5);
-    row4.addField(3, 456.7f);
-
-    return Create.of(row1, row2, row3);
+    return Create.of(inputRecords);
   }
 
   @Override
@@ -100,7 +82,7 @@ public class MockedBeamSQLTable extends BaseBeamTable {
 
         @ProcessElement
         public void processElement(ProcessContext c) {
-          CONTENT.add(c.element().valueInString());
+          CONTENT.add(c.element());
         }
 
         @Teardown


[2/2] beam git commit: This closes #3039

Posted by dh...@apache.org.
This closes #3039


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6729a027
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6729a027
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6729a027

Branch: refs/heads/DSL_SQL
Commit: 6729a027daea04d0ad38f8a83ad1baafa465169f
Parents: 7e918a7 3c6d60f
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 10 10:03:17 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 10 10:03:17 2017 -0700

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |   6 +-
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java |  20 ++-
 .../interpreter/operator/BeamSqlPrimitive.java  |  10 ++
 .../operator/BeamSqlWindowEndExpression.java    |  42 +++++
 .../operator/BeamSqlWindowExpression.java       |  50 ++++++
 .../operator/BeamSqlWindowStartExpression.java  |  43 +++++
 .../dsls/sql/planner/BeamPipelineCreator.java   |  17 +-
 .../beam/dsls/sql/planner/BeamRuleSets.java     |   4 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 154 ++++++++++++++++++
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |   4 +-
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java |   2 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |   2 +-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |   4 +-
 .../beam/dsls/sql/rule/BeamAggregationRule.java | 163 +++++++++++++++++++
 .../beam/dsls/sql/schema/BeamSQLRecordType.java |   5 +
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java |  64 +++++++-
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  21 ++-
 .../sql/transform/BeamAggregationTransform.java | 120 ++++++++++++++
 .../dsls/sql/transform/BeamSQLProjectFn.java    |   8 +-
 .../beam/dsls/sql/planner/BasePlanner.java      |  61 ++++---
 .../sql/planner/BeamGroupByExplainTest.java     |  93 +++++++++++
 .../sql/planner/BeamGroupByPipelineTest.java    |  94 +++++++++++
 .../sql/planner/BeamInvalidGroupByTest.java     |  44 +++++
 .../BeamPlannerAggregationSubmitTest.java       | 136 ++++++++++++++++
 .../sql/planner/BeamPlannerExplainTest.java     |   6 +-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |  12 +-
 .../dsls/sql/planner/MockedBeamSQLTable.java    |  38 ++---
 27 files changed, 1137 insertions(+), 86 deletions(-)
----------------------------------------------------------------------