You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:54 UTC

[31/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
new file mode 100644
index 0000000..9655ebd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * define table schema, to map with Beam IO components.
+ *
+ */
+package org.apache.beam.sdk.extensions.sql.schema;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
new file mode 100644
index 0000000..c44faab
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema.text;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
+ *
+ * <p>
+ * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
+ * </p>
+ */
+public class BeamTextCSVTable extends BeamTextTable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BeamTextCSVTable.class);
+
+  private CSVFormat csvFormat;
+
+  /**
+   * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
+   */
+  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern)  {
+    this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
+  }
+
+  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
+      CSVFormat csvFormat) {
+    super(beamSqlRowType, filePattern);
+    this.csvFormat = csvFormat;
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
+        .apply("parseCSVLine",
+            new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+    return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
new file mode 100644
index 0000000..06109c3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOReader for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOReader
+    extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
+    implements Serializable {
+  private String filePattern;
+  protected BeamSqlRowType beamSqlRowType;
+  protected CSVFormat csvFormat;
+
+  public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern,
+      CSVFormat csvFormat) {
+    this.filePattern = filePattern;
+    this.beamSqlRowType = beamSqlRowType;
+    this.csvFormat = csvFormat;
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> expand(PCollection<String> input) {
+    return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() {
+          @ProcessElement
+          public void processElement(ProcessContext ctx) {
+            String str = ctx.element();
+            ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
+          }
+        }));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
new file mode 100644
index 0000000..1684b37
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOWriter for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone>
+    implements Serializable {
+  private String filePattern;
+  protected BeamSqlRowType beamSqlRowType;
+  protected CSVFormat csvFormat;
+
+  public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern,
+      CSVFormat csvFormat) {
+    this.filePattern = filePattern;
+    this.beamSqlRowType = beamSqlRowType;
+    this.csvFormat = csvFormat;
+  }
+
+  @Override public PDone expand(PCollection<BeamSqlRow> input) {
+    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() {
+
+      @ProcessElement public void processElement(ProcessContext ctx) {
+        BeamSqlRow row = ctx.element();
+        ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat));
+      }
+    })).apply(TextIO.write().to(filePattern));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
new file mode 100644
index 0000000..e85608d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+
+/**
+ * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
+ */
+public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
+  protected String filePattern;
+
+  protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+    super(beamSqlRowType);
+    this.filePattern = filePattern;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.BOUNDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
new file mode 100644
index 0000000..f914e2e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for text files.
+ */
+package org.apache.beam.sdk.extensions.sql.schema.text;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java
new file mode 100644
index 0000000..6a27da8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.transform;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Instant;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation.
+ */
+public class BeamAggregationTransforms implements Serializable{
+  /**
+   * Merge KV to single record.
+   */
+  public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
+    private BeamSqlRowType outRowType;
+    private List<String> aggFieldNames;
+    private int windowStartFieldIdx;
+
+    public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
+        , int windowStartFieldIdx) {
+      this.outRowType = outRowType;
+      this.aggFieldNames = new ArrayList<>();
+      for (AggregateCall ac : aggList) {
+        aggFieldNames.add(ac.getName());
+      }
+      this.windowStartFieldIdx = windowStartFieldIdx;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      BeamSqlRow outRecord = new BeamSqlRow(outRowType);
+      outRecord.updateWindowRange(c.element().getKey(), window);
+
+      KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
+      for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
+        outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
+      }
+      for (int idx = 0; idx < aggFieldNames.size(); ++idx) {
+        outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
+      }
+      if (windowStartFieldIdx != -1) {
+        outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate());
+      }
+
+      c.output(outRecord);
+    }
+  }
+
+  /**
+   * extract group-by fields.
+   */
+  public static class AggregationGroupByKeyFn
+      implements SerializableFunction<BeamSqlRow, BeamSqlRow> {
+    private List<Integer> groupByKeys;
+
+    public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
+      this.groupByKeys = new ArrayList<>();
+      for (int i : groupSet.asList()) {
+        if (i != windowFieldIdx) {
+          groupByKeys.add(i);
+        }
+      }
+    }
+
+    @Override
+    public BeamSqlRow apply(BeamSqlRow input) {
+      BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+      BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
+      keyOfRecord.updateWindowRange(input, null);
+
+      for (int idx = 0; idx < groupByKeys.size(); ++idx) {
+        keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
+      }
+      return keyOfRecord;
+    }
+
+    private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
+      List<String> fieldNames = new ArrayList<>();
+      List<Integer> fieldTypes = new ArrayList<>();
+      for (int idx : groupByKeys) {
+        fieldNames.add(dataType.getFieldsName().get(idx));
+        fieldTypes.add(dataType.getFieldsType().get(idx));
+      }
+      return BeamSqlRowType.create(fieldNames, fieldTypes);
+    }
+  }
+
+  /**
+   * Assign event timestamp.
+   */
+  public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> {
+    private int windowFieldIdx = -1;
+
+    public WindowTimestampFn(int windowFieldIdx) {
+      super();
+      this.windowFieldIdx = windowFieldIdx;
+    }
+
+    @Override
+    public Instant apply(BeamSqlRow input) {
+      return new Instant(input.getDate(windowFieldIdx).getTime());
+    }
+  }
+
+  /**
+   * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}.
+   */
+  public static class AggregationAdaptor
+    extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
+    private List<BeamSqlUdaf> aggregators;
+    private List<BeamSqlExpression> sourceFieldExps;
+    private BeamSqlRowType finalRowType;
+
+    public AggregationAdaptor(List<AggregateCall> aggregationCalls,
+        BeamSqlRowType sourceRowType) {
+      aggregators = new ArrayList<>();
+      sourceFieldExps = new ArrayList<>();
+      List<String> outFieldsName = new ArrayList<>();
+      List<Integer> outFieldsType = new ArrayList<>();
+      for (AggregateCall call : aggregationCalls) {
+        int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
+        BeamSqlExpression sourceExp = new BeamSqlInputRefExpression(
+            CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex);
+        sourceFieldExps.add(sourceExp);
+
+        outFieldsName.add(call.name);
+        int outFieldType = CalciteUtils.toJavaType(call.type.getSqlTypeName());
+        outFieldsType.add(outFieldType);
+
+        switch (call.getAggregation().getName()) {
+          case "COUNT":
+            aggregators.add(new BeamBuiltinAggregations.Count());
+            break;
+          case "MAX":
+            aggregators.add(BeamBuiltinAggregations.Max.create(call.type.getSqlTypeName()));
+            break;
+          case "MIN":
+            aggregators.add(BeamBuiltinAggregations.Min.create(call.type.getSqlTypeName()));
+            break;
+          case "SUM":
+            aggregators.add(BeamBuiltinAggregations.Sum.create(call.type.getSqlTypeName()));
+            break;
+          case "AVG":
+            aggregators.add(BeamBuiltinAggregations.Avg.create(call.type.getSqlTypeName()));
+            break;
+          default:
+            if (call.getAggregation() instanceof SqlUserDefinedAggFunction) {
+              // handle UDAF.
+              SqlUserDefinedAggFunction udaf = (SqlUserDefinedAggFunction) call.getAggregation();
+              AggregateFunctionImpl fn = (AggregateFunctionImpl) udaf.function;
+              try {
+                aggregators.add((BeamSqlUdaf) fn.declaringClass.newInstance());
+              } catch (Exception e) {
+                throw new IllegalStateException(e);
+              }
+            } else {
+              throw new UnsupportedOperationException(
+                  String.format("Aggregator [%s] is not supported",
+                  call.getAggregation().getName()));
+            }
+          break;
+        }
+      }
+      finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
+    }
+    @Override
+    public AggregationAccumulator createAccumulator() {
+      AggregationAccumulator initialAccu = new AggregationAccumulator();
+      for (BeamSqlUdaf agg : aggregators) {
+        initialAccu.accumulatorElements.add(agg.init());
+      }
+      return initialAccu;
+    }
+    @Override
+    public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) {
+      AggregationAccumulator deltaAcc = new AggregationAccumulator();
+      for (int idx = 0; idx < aggregators.size(); ++idx) {
+        deltaAcc.accumulatorElements.add(
+            aggregators.get(idx).add(accumulator.accumulatorElements.get(idx),
+            sourceFieldExps.get(idx).evaluate(input).getValue()));
+      }
+      return deltaAcc;
+    }
+    @Override
+    public AggregationAccumulator mergeAccumulators(Iterable<AggregationAccumulator> accumulators) {
+      AggregationAccumulator deltaAcc = new AggregationAccumulator();
+      for (int idx = 0; idx < aggregators.size(); ++idx) {
+        List accs = new ArrayList<>();
+        Iterator<AggregationAccumulator> ite = accumulators.iterator();
+        while (ite.hasNext()) {
+          accs.add(ite.next().accumulatorElements.get(idx));
+        }
+        deltaAcc.accumulatorElements.add(aggregators.get(idx).merge(accs));
+      }
+      return deltaAcc;
+    }
+    @Override
+    public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
+      BeamSqlRow result = new BeamSqlRow(finalRowType);
+      for (int idx = 0; idx < aggregators.size(); ++idx) {
+        result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
+      }
+      return result;
+    }
+    @Override
+    public Coder<AggregationAccumulator> getAccumulatorCoder(
+        CoderRegistry registry, Coder<BeamSqlRow> inputCoder)
+        throws CannotProvideCoderException {
+      registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of());
+      List<Coder> aggAccuCoderList = new ArrayList<>();
+      for (BeamSqlUdaf udaf : aggregators) {
+        aggAccuCoderList.add(udaf.getAccumulatorCoder(registry));
+      }
+      return new AggregationAccumulatorCoder(aggAccuCoderList);
+    }
+  }
+
+  /**
+   * A class to holder varied accumulator objects.
+   */
+  public static class AggregationAccumulator{
+    private List accumulatorElements = new ArrayList<>();
+  }
+
+  /**
+   * Coder for {@link AggregationAccumulator}.
+   */
+  public static class AggregationAccumulatorCoder extends CustomCoder<AggregationAccumulator>{
+    private VarIntCoder sizeCoder = VarIntCoder.of();
+    private List<Coder> elementCoders;
+
+    public AggregationAccumulatorCoder(List<Coder> elementCoders) {
+      this.elementCoders = elementCoders;
+    }
+
+    @Override
+    public void encode(AggregationAccumulator value, OutputStream outStream)
+        throws CoderException, IOException {
+      sizeCoder.encode(value.accumulatorElements.size(), outStream);
+      for (int idx = 0; idx < value.accumulatorElements.size(); ++idx) {
+        elementCoders.get(idx).encode(value.accumulatorElements.get(idx), outStream);
+      }
+    }
+
+    @Override
+    public AggregationAccumulator decode(InputStream inStream) throws CoderException, IOException {
+      AggregationAccumulator accu = new AggregationAccumulator();
+      int size = sizeCoder.decode(inStream);
+      for (int idx = 0; idx < size; ++idx) {
+        accu.accumulatorElements.add(elementCoders.get(idx).decode(inStream));
+      }
+      return accu;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java
new file mode 100644
index 0000000..1183668
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.transform;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.Iterator;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Built-in aggregations functions for COUNT/MAX/MIN/SUM/AVG.
+ */
+class BeamBuiltinAggregations {
+  /**
+   * Built-in aggregation for COUNT.
+   */
+  public static final class Count<T> extends BeamSqlUdaf<T, Long, Long> {
+    public Count() {}
+
+    @Override
+    public Long init() {
+      return 0L;
+    }
+
+    @Override
+    public Long add(Long accumulator, T input) {
+      return accumulator + 1;
+    }
+
+    @Override
+    public Long merge(Iterable<Long> accumulators) {
+      long v = 0L;
+      Iterator<Long> ite = accumulators.iterator();
+      while (ite.hasNext()) {
+        v += ite.next();
+      }
+      return v;
+    }
+
+    @Override
+    public Long result(Long accumulator) {
+      return accumulator;
+    }
+  }
+
+  /**
+   * Built-in aggregation for MAX.
+   */
+  public static final class Max<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
+    public static Max create(SqlTypeName fieldType) {
+      switch (fieldType) {
+        case INTEGER:
+          return new BeamBuiltinAggregations.Max<Integer>(fieldType);
+        case SMALLINT:
+          return new BeamBuiltinAggregations.Max<Short>(fieldType);
+        case TINYINT:
+          return new BeamBuiltinAggregations.Max<Byte>(fieldType);
+        case BIGINT:
+          return new BeamBuiltinAggregations.Max<Long>(fieldType);
+        case FLOAT:
+          return new BeamBuiltinAggregations.Max<Float>(fieldType);
+        case DOUBLE:
+          return new BeamBuiltinAggregations.Max<Double>(fieldType);
+        case TIMESTAMP:
+          return new BeamBuiltinAggregations.Max<Date>(fieldType);
+        case DECIMAL:
+          return new BeamBuiltinAggregations.Max<BigDecimal>(fieldType);
+        default:
+          throw new UnsupportedOperationException(
+              String.format("[%s] is not support in MAX", fieldType));
+      }
+    }
+
+    private final SqlTypeName fieldType;
+    private Max(SqlTypeName fieldType) {
+      this.fieldType = fieldType;
+    }
+
+    @Override
+    public T init() {
+      return null;
+    }
+
+    @Override
+    public T add(T accumulator, T input) {
+      return (accumulator == null || accumulator.compareTo(input) < 0) ? input : accumulator;
+    }
+
+    @Override
+    public T merge(Iterable<T> accumulators) {
+      Iterator<T> ite = accumulators.iterator();
+      T mergedV = ite.next();
+      while (ite.hasNext()) {
+        T v = ite.next();
+        mergedV = mergedV.compareTo(v) > 0 ? mergedV : v;
+      }
+      return mergedV;
+    }
+
+    @Override
+    public T result(T accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
+      return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
+    }
+  }
+
+  /**
+   * Built-in aggregation for MIN.
+   */
+  public static final class Min<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
+    public static Min create(SqlTypeName fieldType) {
+      switch (fieldType) {
+        case INTEGER:
+          return new BeamBuiltinAggregations.Min<Integer>(fieldType);
+        case SMALLINT:
+          return new BeamBuiltinAggregations.Min<Short>(fieldType);
+        case TINYINT:
+          return new BeamBuiltinAggregations.Min<Byte>(fieldType);
+        case BIGINT:
+          return new BeamBuiltinAggregations.Min<Long>(fieldType);
+        case FLOAT:
+          return new BeamBuiltinAggregations.Min<Float>(fieldType);
+        case DOUBLE:
+          return new BeamBuiltinAggregations.Min<Double>(fieldType);
+        case TIMESTAMP:
+          return new BeamBuiltinAggregations.Min<Date>(fieldType);
+        case DECIMAL:
+          return new BeamBuiltinAggregations.Min<BigDecimal>(fieldType);
+        default:
+          throw new UnsupportedOperationException(
+              String.format("[%s] is not support in MIN", fieldType));
+      }
+    }
+
+    private final SqlTypeName fieldType;
+    private Min(SqlTypeName fieldType) {
+      this.fieldType = fieldType;
+    }
+
+    @Override
+    public T init() {
+      return null;
+    }
+
+    @Override
+    public T add(T accumulator, T input) {
+      return (accumulator == null || accumulator.compareTo(input) > 0) ? input : accumulator;
+    }
+
+    @Override
+    public T merge(Iterable<T> accumulators) {
+      Iterator<T> ite = accumulators.iterator();
+      T mergedV = ite.next();
+      while (ite.hasNext()) {
+        T v = ite.next();
+        mergedV = mergedV.compareTo(v) < 0 ? mergedV : v;
+      }
+      return mergedV;
+    }
+
+    @Override
+    public T result(T accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
+      return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
+    }
+  }
+
+  /**
+   * Built-in aggregation for SUM.
+   */
+  public static final class Sum<T> extends BeamSqlUdaf<T, BigDecimal, T> {
+    public static Sum create(SqlTypeName fieldType) {
+      switch (fieldType) {
+        case INTEGER:
+          return new BeamBuiltinAggregations.Sum<Integer>(fieldType);
+        case SMALLINT:
+          return new BeamBuiltinAggregations.Sum<Short>(fieldType);
+        case TINYINT:
+          return new BeamBuiltinAggregations.Sum<Byte>(fieldType);
+        case BIGINT:
+          return new BeamBuiltinAggregations.Sum<Long>(fieldType);
+        case FLOAT:
+          return new BeamBuiltinAggregations.Sum<Float>(fieldType);
+        case DOUBLE:
+          return new BeamBuiltinAggregations.Sum<Double>(fieldType);
+        case TIMESTAMP:
+          return new BeamBuiltinAggregations.Sum<Date>(fieldType);
+        case DECIMAL:
+          return new BeamBuiltinAggregations.Sum<BigDecimal>(fieldType);
+        default:
+          throw new UnsupportedOperationException(
+              String.format("[%s] is not support in SUM", fieldType));
+      }
+    }
+
+    private SqlTypeName fieldType;
+      private Sum(SqlTypeName fieldType) {
+        this.fieldType = fieldType;
+      }
+
+    @Override
+    public BigDecimal init() {
+      return new BigDecimal(0);
+    }
+
+    @Override
+    public BigDecimal add(BigDecimal accumulator, T input) {
+      return accumulator.add(new BigDecimal(input.toString()));
+    }
+
+    @Override
+    public BigDecimal merge(Iterable<BigDecimal> accumulators) {
+      BigDecimal v = new BigDecimal(0);
+      Iterator<BigDecimal> ite = accumulators.iterator();
+      while (ite.hasNext()) {
+        v = v.add(ite.next());
+      }
+      return v;
+    }
+
+    @Override
+    public T result(BigDecimal accumulator) {
+      Object result = null;
+      switch (fieldType) {
+        case INTEGER:
+          result = accumulator.intValue();
+          break;
+        case BIGINT:
+          result = accumulator.longValue();
+          break;
+        case SMALLINT:
+          result = accumulator.shortValue();
+          break;
+        case TINYINT:
+          result = accumulator.byteValue();
+          break;
+        case DOUBLE:
+          result = accumulator.doubleValue();
+          break;
+        case FLOAT:
+          result = accumulator.floatValue();
+          break;
+        case DECIMAL:
+          result = accumulator;
+          break;
+        default:
+          break;
+      }
+      return (T) result;
+    }
+  }
+
+  /**
+   * Built-in aggregation for AVG.
+   */
+  public static final class Avg<T> extends BeamSqlUdaf<T, KV<BigDecimal, Long>, T> {
+    public static Avg create(SqlTypeName fieldType) {
+      switch (fieldType) {
+        case INTEGER:
+          return new BeamBuiltinAggregations.Avg<Integer>(fieldType);
+        case SMALLINT:
+          return new BeamBuiltinAggregations.Avg<Short>(fieldType);
+        case TINYINT:
+          return new BeamBuiltinAggregations.Avg<Byte>(fieldType);
+        case BIGINT:
+          return new BeamBuiltinAggregations.Avg<Long>(fieldType);
+        case FLOAT:
+          return new BeamBuiltinAggregations.Avg<Float>(fieldType);
+        case DOUBLE:
+          return new BeamBuiltinAggregations.Avg<Double>(fieldType);
+        case TIMESTAMP:
+          return new BeamBuiltinAggregations.Avg<Date>(fieldType);
+        case DECIMAL:
+          return new BeamBuiltinAggregations.Avg<BigDecimal>(fieldType);
+        default:
+          throw new UnsupportedOperationException(
+              String.format("[%s] is not support in AVG", fieldType));
+      }
+    }
+
+    private SqlTypeName fieldType;
+      private Avg(SqlTypeName fieldType) {
+        this.fieldType = fieldType;
+      }
+
+    @Override
+    public KV<BigDecimal, Long> init() {
+      return KV.of(new BigDecimal(0), 0L);
+    }
+
+    @Override
+    public KV<BigDecimal, Long> add(KV<BigDecimal, Long> accumulator, T input) {
+      return KV.of(
+              accumulator.getKey().add(new BigDecimal(input.toString())),
+              accumulator.getValue() + 1);
+    }
+
+    @Override
+    public KV<BigDecimal, Long> merge(Iterable<KV<BigDecimal, Long>> accumulators) {
+      BigDecimal v = new BigDecimal(0);
+      long s = 0;
+      Iterator<KV<BigDecimal, Long>> ite = accumulators.iterator();
+      while (ite.hasNext()) {
+        KV<BigDecimal, Long> r = ite.next();
+        v = v.add(r.getKey());
+        s += r.getValue();
+      }
+      return KV.of(v, s);
+    }
+
+    @Override
+    public T result(KV<BigDecimal, Long> accumulator) {
+      BigDecimal decimalAvg = accumulator.getKey().divide(
+          new BigDecimal(accumulator.getValue()));
+      Object result = null;
+      switch (fieldType) {
+        case INTEGER:
+          result = decimalAvg.intValue();
+          break;
+        case BIGINT:
+          result = decimalAvg.longValue();
+          break;
+        case SMALLINT:
+          result = decimalAvg.shortValue();
+          break;
+        case TINYINT:
+          result = decimalAvg.byteValue();
+          break;
+        case DOUBLE:
+          result = decimalAvg.doubleValue();
+          break;
+        case FLOAT:
+          result = decimalAvg.floatValue();
+          break;
+        case DECIMAL:
+          result = decimalAvg;
+          break;
+        default:
+          break;
+      }
+      return (T) result;
+    }
+
+    @Override
+    public Coder<KV<BigDecimal, Long>> getAccumulatorCoder(CoderRegistry registry)
+        throws CannotProvideCoderException {
+      return KvCoder.of(BigDecimalCoder.of(), VarLongCoder.of());
+    }
+  }
+
+  /**
+   * Find {@link Coder} for Beam SQL field types.
+   */
+  private static Coder getSqlTypeCoder(SqlTypeName sqlType) {
+    switch (sqlType) {
+      case INTEGER:
+        return VarIntCoder.of();
+      case SMALLINT:
+        return SerializableCoder.of(Short.class);
+      case TINYINT:
+        return ByteCoder.of();
+      case BIGINT:
+        return VarLongCoder.of();
+      case FLOAT:
+        return SerializableCoder.of(Float.class);
+      case DOUBLE:
+        return DoubleCoder.of();
+      case TIMESTAMP:
+        return SerializableCoder.of(Date.class);
+      case DECIMAL:
+        return BigDecimalCoder.of();
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Cannot find a Coder for data type [%s]", sqlType));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java
new file mode 100644
index 0000000..d819421
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.transform;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.util.Pair;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation.
+ */
+public class BeamJoinTransforms {
+
+  /**
+   * A {@code SimpleFunction} to extract join fields from the specified row.
+   */
+  public static class ExtractJoinFields
+      extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+    private final boolean isLeft;
+    private final List<Pair<Integer, Integer>> joinColumns;
+
+    public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) {
+      this.isLeft = isLeft;
+      this.joinColumns = joinColumns;
+    }
+
+    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+      // build the type
+      // the name of the join field is not important
+      List<String> names = new ArrayList<>(joinColumns.size());
+      List<Integer> types = new ArrayList<>(joinColumns.size());
+      for (int i = 0; i < joinColumns.size(); i++) {
+        names.add("c" + i);
+        types.add(isLeft
+            ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
+            input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
+      }
+      BeamSqlRowType type = BeamSqlRowType.create(names, types);
+
+      // build the row
+      BeamSqlRow row = new BeamSqlRow(type);
+      for (int i = 0; i < joinColumns.size(); i++) {
+        row.addField(i, input
+            .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
+      }
+      return KV.of(row, input);
+    }
+  }
+
+
+  /**
+   * A {@code DoFn} which implement the sideInput-JOIN.
+   */
+  public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
+    private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
+    private final JoinRelType joinType;
+    private final BeamSqlRow rightNullRow;
+    private final boolean swap;
+
+    public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
+        PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
+        boolean swap) {
+      this.joinType = joinType;
+      this.rightNullRow = rightNullRow;
+      this.sideInputView = sideInputView;
+      this.swap = swap;
+    }
+
+    @ProcessElement public void processElement(ProcessContext context) {
+      BeamSqlRow key = context.element().getKey();
+      BeamSqlRow leftRow = context.element().getValue();
+      Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
+      Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
+
+      if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
+        Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
+        while (it.hasNext()) {
+          context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
+        }
+      } else {
+        if (joinType == JoinRelType.LEFT) {
+          context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap));
+        }
+      }
+    }
+  }
+
+
+  /**
+   * A {@code SimpleFunction} to combine two rows into one.
+   */
+  public static class JoinParts2WholeRow
+      extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
+    @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
+      KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
+      BeamSqlRow leftRow = parts.getKey();
+      BeamSqlRow rightRow = parts.getValue();
+      return combineTwoRowsIntoOne(leftRow, rightRow, false);
+    }
+  }
+
+  /**
+   * As the method name suggests: combine two rows into one wide row.
+   */
+  private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
+      BeamSqlRow rightRow, boolean swap) {
+    if (swap) {
+      return combineTwoRowsIntoOneHelper(rightRow, leftRow);
+    } else {
+      return combineTwoRowsIntoOneHelper(leftRow, rightRow);
+    }
+  }
+
+  /**
+   * As the method name suggests: combine two rows into one wide row.
+   */
+  private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
+      BeamSqlRow rightRow) {
+    // build the type
+    List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
+    names.addAll(leftRow.getDataType().getFieldsName());
+    names.addAll(rightRow.getDataType().getFieldsName());
+
+    List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
+    types.addAll(leftRow.getDataType().getFieldsType());
+    types.addAll(rightRow.getDataType().getFieldsType());
+    BeamSqlRowType type = BeamSqlRowType.create(names, types);
+
+    BeamSqlRow row = new BeamSqlRow(type);
+    // build the row
+    for (int i = 0; i < leftRow.size(); i++) {
+      row.addField(i, leftRow.getFieldValue(i));
+    }
+
+    for (int i = 0; i < rightRow.size(); i++) {
+      row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
+    }
+
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java
new file mode 100644
index 0000000..8546160
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.transform;
+
+import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.rel.BeamSetOperatorRelBase;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations.
+ */
+public abstract class BeamSetOperatorsTransforms {
+  /**
+   * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
+   */
+  public static class BeamSqlRow2KvFn extends
+      SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+      return KV.of(input, input);
+    }
+  }
+
+  /**
+   * Filter function used for Set operators.
+   */
+  public static class SetOperatorFilteringDoFn extends
+      DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
+    private TupleTag<BeamSqlRow> leftTag;
+    private TupleTag<BeamSqlRow> rightTag;
+    private BeamSetOperatorRelBase.OpType opType;
+    // ALL?
+    private boolean all;
+
+    public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
+        BeamSetOperatorRelBase.OpType opType, boolean all) {
+      this.leftTag = leftTag;
+      this.rightTag = rightTag;
+      this.opType = opType;
+      this.all = all;
+    }
+
+    @ProcessElement public void processElement(ProcessContext ctx) {
+      CoGbkResult coGbkResult = ctx.element().getValue();
+      Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
+      Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
+      switch (opType) {
+        case UNION:
+          if (all) {
+            // output both left & right
+            Iterator<BeamSqlRow> iter = leftRows.iterator();
+            while (iter.hasNext()) {
+              ctx.output(iter.next());
+            }
+            iter = rightRows.iterator();
+            while (iter.hasNext()) {
+              ctx.output(iter.next());
+            }
+          } else {
+            // only output the key
+            ctx.output(ctx.element().getKey());
+          }
+          break;
+        case INTERSECT:
+          if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
+            if (all) {
+              for (BeamSqlRow leftRow : leftRows) {
+                ctx.output(leftRow);
+              }
+            } else {
+              ctx.output(ctx.element().getKey());
+            }
+          }
+          break;
+        case MINUS:
+          if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
+            Iterator<BeamSqlRow> iter = leftRows.iterator();
+            if (all) {
+              // output all
+              while (iter.hasNext()) {
+                ctx.output(iter.next());
+              }
+            } else {
+              // only output one
+              ctx.output(iter.next());
+            }
+          }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java
new file mode 100644
index 0000000..372c38c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step.
+ *
+ */
+public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+
+  private String stepName;
+  private BeamSqlExpressionExecutor executor;
+
+  public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) {
+    super();
+    this.stepName = stepName;
+    this.executor = executor;
+  }
+
+  @Setup
+  public void setup() {
+    executor.prepare();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    BeamSqlRow in = c.element();
+
+    List<Object> result = executor.execute(in);
+
+    if ((Boolean) result.get(0)) {
+      c.output(in);
+    }
+  }
+
+  @Teardown
+  public void close() {
+    executor.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java
new file mode 100644
index 0000000..9221947
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.transform;
+
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * A test PTransform to display output in console.
+ *
+ */
+public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> {
+
+  private String stepName;
+
+  public BeamSqlOutputToConsoleFn(String stepName) {
+    super();
+    this.stepName = stepName;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    System.out.println("Output: " + c.element().getDataValues());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java
new file mode 100644
index 0000000..af398ea
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ *
+ * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step.
+ *
+ */
+public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+  private String stepName;
+  private BeamSqlExpressionExecutor executor;
+  private BeamSqlRowType outputRowType;
+
+  public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
+      BeamSqlRowType outputRowType) {
+    super();
+    this.stepName = stepName;
+    this.executor = executor;
+    this.outputRowType = outputRowType;
+  }
+
+  @Setup
+  public void setup() {
+    executor.prepare();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c, BoundedWindow window) {
+    BeamSqlRow inputRow = c.element();
+    List<Object> results = executor.execute(inputRow);
+
+    BeamSqlRow outRow = new BeamSqlRow(outputRowType);
+    outRow.updateWindowRange(inputRow, window);
+
+    for (int idx = 0; idx < results.size(); ++idx) {
+      BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
+    }
+
+    c.output(outRow);
+  }
+
+  @Teardown
+  public void close() {
+    executor.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java
new file mode 100644
index 0000000..7797ddf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql pipeline.
+ */
+package org.apache.beam.sdk.extensions.sql.transform;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java
new file mode 100644
index 0000000..9970955
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.utils;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility methods for Calcite related operations.
+ */
+public class CalciteUtils {
+  private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>();
+  private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>();
+  static {
+    JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER);
+    JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT);
+    JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
+    JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE);
+    JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
+    JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
+
+    JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN);
+
+    for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) {
+      CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey());
+    }
+  }
+
+  /**
+   * Get the corresponding {@code SqlTypeName} for an integer sql type.
+   */
+  public static SqlTypeName toCalciteType(int type) {
+    return JAVA_TO_CALCITE_MAPPING.get(type);
+  }
+
+  /**
+   * Get the integer sql type from Calcite {@code SqlTypeName}.
+   */
+  public static Integer toJavaType(SqlTypeName typeName) {
+    return CALCITE_TO_JAVA_MAPPING.get(typeName);
+  }
+
+  /**
+   * Get the {@code SqlTypeName} for the specified column of a table.
+   */
+  public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
+    return toCalciteType(schema.getFieldsType().get(index));
+  }
+
+  /**
+   * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
+   */
+  public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
+    List<String> fieldNames = new ArrayList<>();
+    List<Integer> fieldTypes = new ArrayList<>();
+    for (RelDataTypeField f : tableInfo.getFieldList()) {
+      fieldNames.add(f.getName());
+      fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
+    }
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
+  }
+
+  /**
+   * Create an instance of {@code RelDataType} so it can be used to create a table.
+   */
+  public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
+    return new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a) {
+        RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
+        for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
+          builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
+        }
+        return builder.build();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java
new file mode 100644
index 0000000..e4d6148
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes.
+ */
+package org.apache.beam.sdk.extensions.sql.utils;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
deleted file mode 100644
index 922931c..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlApiSurfaceTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql;
-
-import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-import org.apache.beam.sdk.util.ApiSurface;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Surface test for BeamSql api.
- */
-@RunWith(JUnit4.class)
-public class BeamSqlApiSurfaceTest {
-  @Test
-  public void testSdkApiSurface() throws Exception {
-
-    @SuppressWarnings("unchecked")
-    final Set<String> allowed =
-        ImmutableSet.of(
-            "org.apache.beam",
-            "org.joda.time",
-            "org.apache.commons.csv");
-
-    ApiSurface surface = ApiSurface
-        .ofClass(BeamSqlCli.class)
-        .includingClass(BeamSql.class)
-        .includingClass(BeamSqlEnv.class)
-        .includingPackage("org.apache.beam.dsls.sql.schema",
-            getClass().getClassLoader())
-        .pruningPrefix("java")
-        .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*Test")
-        .pruningPattern("org[.]apache[.]beam[.]dsls[.]sql[.].*TestBase");
-
-    assertThat(surface, containsOnlyPackages(allowed));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
deleted file mode 100644
index a142514..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql;
-
-import java.sql.Types;
-import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.junit.Test;
-
-/**
- * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window
- * with BOUNDED PCollection.
- */
-public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
-  /**
-   * GROUP-BY with single aggregation function with bounded PCollection.
-   */
-  @Test
-  public void testAggregationWithoutWindowWithBounded() throws Exception {
-    runAggregationWithoutWindow(boundedInput1);
-  }
-
-  /**
-   * GROUP-BY with single aggregation function with unbounded PCollection.
-   */
-  @Test
-  public void testAggregationWithoutWindowWithUnbounded() throws Exception {
-    runAggregationWithoutWindow(unboundedInput1);
-  }
-
-  private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
-
-    PCollection<BeamSqlRow> result =
-        input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
-
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT));
-
-    BeamSqlRow record = new BeamSqlRow(resultType);
-    record.addField("f_int2", 0);
-    record.addField("size", 4L);
-
-    PAssert.that(result).containsInAnyOrder(record);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  /**
-   * GROUP-BY with multiple aggregation functions with bounded PCollection.
-   */
-  @Test
-  public void testAggregationFunctionsWithBounded() throws Exception{
-    runAggregationFunctions(boundedInput1);
-  }
-
-  /**
-   * GROUP-BY with multiple aggregation functions with unbounded PCollection.
-   */
-  @Test
-  public void testAggregationFunctionsWithUnbounded() throws Exception{
-    runAggregationFunctions(unboundedInput1);
-  }
-
-  private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{
-    String sql = "select f_int2, count(*) as size, "
-        + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
-        + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
-        + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3,"
-        + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4,"
-        + "sum(f_double) as sum5, avg(f_double) as avg5, "
-        + "max(f_double) as max5, min(f_double) as min5,"
-        + "max(f_timestamp) as max6, min(f_timestamp) as min6 "
-        + "FROM TABLE_A group by f_int2";
-
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
-        .apply("testAggregationFunctions", BeamSql.query(sql));
-
-    BeamSqlRowType resultType = BeamSqlRowType.create(
-        Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
-            "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
-            "max5", "min5", "max6", "min6"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
-            Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
-            Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT,
-            Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
-            Types.TIMESTAMP, Types.TIMESTAMP));
-
-    BeamSqlRow record = new BeamSqlRow(resultType);
-    record.addField("f_int2", 0);
-    record.addField("size", 4L);
-
-    record.addField("sum1", 10000L);
-    record.addField("avg1", 2500L);
-    record.addField("max1", 4000L);
-    record.addField("min1", 1000L);
-
-    record.addField("sum2", (short) 10);
-    record.addField("avg2", (short) 2);
-    record.addField("max2", (short) 4);
-    record.addField("min2", (short) 1);
-
-    record.addField("sum3", (byte) 10);
-    record.addField("avg3", (byte) 2);
-    record.addField("max3", (byte) 4);
-    record.addField("min3", (byte) 1);
-
-    record.addField("sum4", 10.0F);
-    record.addField("avg4", 2.5F);
-    record.addField("max4", 4.0F);
-    record.addField("min4", 1.0F);
-
-    record.addField("sum5", 10.0);
-    record.addField("avg5", 2.5);
-    record.addField("max5", 4.0);
-    record.addField("min5", 1.0);
-
-    record.addField("max6", FORMAT.parse("2017-01-01 02:04:03"));
-    record.addField("min6", FORMAT.parse("2017-01-01 01:01:03"));
-
-    PAssert.that(result).containsInAnyOrder(record);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  /**
-   * Implicit GROUP-BY with DISTINCT with bounded PCollection.
-   */
-  @Test
-  public void testDistinctWithBounded() throws Exception {
-    runDistinct(boundedInput1);
-  }
-
-  /**
-   * Implicit GROUP-BY with DISTINCT with unbounded PCollection.
-   */
-  @Test
-  public void testDistinctWithUnbounded() throws Exception {
-    runDistinct(unboundedInput1);
-  }
-
-  private void runDistinct(PCollection<BeamSqlRow> input) throws Exception {
-    String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
-
-    PCollection<BeamSqlRow> result =
-        input.apply("testDistinct", BeamSql.simpleQuery(sql));
-
-    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT));
-
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
-    record1.addField("f_int", 1);
-    record1.addField("f_long", 1000L);
-
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
-    record2.addField("f_int", 2);
-    record2.addField("f_long", 2000L);
-
-    BeamSqlRow record3 = new BeamSqlRow(resultType);
-    record3.addField("f_int", 3);
-    record3.addField("f_long", 3000L);
-
-    BeamSqlRow record4 = new BeamSqlRow(resultType);
-    record4.addField("f_int", 4);
-    record4.addField("f_long", 4000L);
-
-    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  /**
-   * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection.
-   */
-  @Test
-  public void testTumbleWindowWithBounded() throws Exception {
-    runTumbleWindow(boundedInput1);
-  }
-
-  /**
-   * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection.
-   */
-  @Test
-  public void testTumbleWindowWithUnbounded() throws Exception {
-    runTumbleWindow(unboundedInput1);
-  }
-
-  private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
-        + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
-        + " FROM TABLE_A"
-        + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
-        .apply("testTumbleWindow", BeamSql.query(sql));
-
-    BeamSqlRowType resultType = BeamSqlRowType.create(
-        Arrays.asList("f_int2", "size", "window_start"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
-
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
-    record1.addField("f_int2", 0);
-    record1.addField("size", 3L);
-    record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
-    record2.addField("f_int2", 0);
-    record2.addField("size", 1L);
-    record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
-
-    PAssert.that(result).containsInAnyOrder(record1, record2);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  /**
-   * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection.
-   */
-  @Test
-  public void testHopWindowWithBounded() throws Exception {
-    runHopWindow(boundedInput1);
-  }
-
-  /**
-   * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection.
-   */
-  @Test
-  public void testHopWindowWithUnbounded() throws Exception {
-    runHopWindow(unboundedInput1);
-  }
-
-  private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
-        + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
-        + " FROM PCOLLECTION"
-        + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
-    PCollection<BeamSqlRow> result =
-        input.apply("testHopWindow", BeamSql.simpleQuery(sql));
-
-    BeamSqlRowType resultType = BeamSqlRowType.create(
-        Arrays.asList("f_int2", "size", "window_start"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
-
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
-    record1.addField("f_int2", 0);
-    record1.addField("size", 3L);
-    record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
-
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
-    record2.addField("f_int2", 0);
-    record2.addField("size", 3L);
-    record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-
-    BeamSqlRow record3 = new BeamSqlRow(resultType);
-    record3.addField("f_int2", 0);
-    record3.addField("size", 1L);
-    record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
-    record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
-    record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
-
-    BeamSqlRow record4 = new BeamSqlRow(resultType);
-    record4.addField("f_int2", 0);
-    record4.addField("size", 1L);
-    record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
-    record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
-    record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime()));
-
-    PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  /**
-   * GROUP-BY with SESSION window with bounded PCollection.
-   */
-  @Test
-  public void testSessionWindowWithBounded() throws Exception {
-    runSessionWindow(boundedInput1);
-  }
-
-  /**
-   * GROUP-BY with SESSION window with unbounded PCollection.
-   */
-  @Test
-  public void testSessionWindowWithUnbounded() throws Exception {
-    runSessionWindow(unboundedInput1);
-  }
-
-  private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
-        + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
-        + " FROM TABLE_A"
-        + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
-        .apply("testSessionWindow", BeamSql.query(sql));
-
-    BeamSqlRowType resultType = BeamSqlRowType.create(
-        Arrays.asList("f_int2", "size", "window_start"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
-
-    BeamSqlRow record1 = new BeamSqlRow(resultType);
-    record1.addField("f_int2", 0);
-    record1.addField("size", 3L);
-    record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
-    record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
-    record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
-
-    BeamSqlRow record2 = new BeamSqlRow(resultType);
-    record2.addField("f_int2", 0);
-    record2.addField("size", 1L);
-    record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
-    record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime()));
-    record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime()));
-
-    PAssert.that(result).containsInAnyOrder(record1, record2);
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testWindowOnNonTimestampField() throws Exception {
-    exceptions.expect(IllegalStateException.class);
-    exceptions.expectMessage(
-        "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'");
-    pipeline.enableAbandonedNodeEnforcement(false);
-
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
-        + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
-    PCollection<BeamSqlRow> result =
-        PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
-        .apply("testWindowOnNonTimestampField", BeamSql.query(sql));
-
-    pipeline.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testUnsupportedDistinct() throws Exception {
-    exceptions.expect(IllegalStateException.class);
-    exceptions.expectMessage("Encountered \"*\"");
-    pipeline.enableAbandonedNodeEnforcement(false);
-
-    String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
-
-    PCollection<BeamSqlRow> result =
-        boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
-
-    pipeline.run().waitUntilFinish();
-  }
-}