You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:12 UTC

[49/59] beam git commit: move all implementation classes/packages into impl package

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
deleted file mode 100644
index 363cf3b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.rel.BeamMinusRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Minus;
-import org.apache.calcite.rel.logical.LogicalMinus;
-
-/**
- * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
- */
-public class BeamMinusRule extends ConverterRule {
-  public static final BeamMinusRule INSTANCE = new BeamMinusRule();
-  private BeamMinusRule() {
-    super(LogicalMinus.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamMinusRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Minus minus = (Minus) rel;
-    final List<RelNode> inputs = minus.getInputs();
-    return new BeamMinusRel(
-        minus.getCluster(),
-        minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convertList(inputs, BeamLogicalConvention.INSTANCE),
-        minus.all
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
deleted file mode 100644
index 4f2f8c9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-
-/**
- * A {@code ConverterRule} to replace {@link Project} with
- * {@link BeamProjectRel}.
- *
- */
-public class BeamProjectRule extends ConverterRule {
-  public static final BeamProjectRule INSTANCE = new BeamProjectRule();
-
-  private BeamProjectRule() {
-    super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Project project = (Project) rel;
-    final RelNode input = project.getInput();
-
-    return new BeamProjectRel(project.getCluster(),
-        project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        project.getProjects(), project.getRowType());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
deleted file mode 100644
index e104d37..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.rel.BeamSortRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalSort;
-
-/**
- * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}.
- */
-public class BeamSortRule extends ConverterRule {
-  public static final BeamSortRule INSTANCE = new BeamSortRule();
-  private BeamSortRule() {
-    super(LogicalSort.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamSortRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Sort sort = (Sort) rel;
-    final RelNode input = sort.getInput();
-    return new BeamSortRel(
-        sort.getCluster(),
-        sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        sort.getCollation(),
-        sort.offset,
-        sort.fetch
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
deleted file mode 100644
index 975ccbc..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.rel.BeamUnionRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Union;
-import org.apache.calcite.rel.logical.LogicalUnion;
-
-/**
- * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
- * {@link BeamUnionRule}.
- */
-public class BeamUnionRule extends ConverterRule {
-  public static final BeamUnionRule INSTANCE = new BeamUnionRule();
-  private BeamUnionRule() {
-    super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamUnionRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Union union = (Union) rel;
-
-    return new BeamUnionRel(
-        union.getCluster(),
-        union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
-        union.all
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
deleted file mode 100644
index 86a8f72..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.rel.BeamValuesRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.logical.LogicalValues;
-
-/**
- * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}.
- */
-public class BeamValuesRule extends ConverterRule {
-  public static final BeamValuesRule INSTANCE = new BeamValuesRule();
-  private BeamValuesRule() {
-    super(LogicalValues.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamValuesRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Values values = (Values) rel;
-    return new BeamValuesRel(
-        values.getCluster(),
-        values.getRowType(),
-        values.getTuples(),
-        values.getTraitSet().replace(BeamLogicalConvention.INSTANCE)
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
deleted file mode 100644
index f57cdee..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.calcite.plan.RelOptRule} to generate
- * {@link org.apache.beam.sdk.extensions.sql.rel.BeamRelNode}.
- */
-package org.apache.beam.sdk.extensions.sql.rule;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
index 616e9f3..2e0efe8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.calcite.sql.type.SqlTypeName;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
index 39e2fd3..bf097d4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 
 /**
  *  A {@link Coder} encodes {@link BeamSqlRow}.

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index 53e8483..c769928 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.math.BigDecimal;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.NlsString;
 import org.apache.commons.csv.CSVFormat;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java
deleted file mode 100644
index 6a27da8..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamAggregationTransforms.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.transform;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.joda.time.Instant;
-
-/**
- * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation.
- */
-public class BeamAggregationTransforms implements Serializable{
-  /**
-   * Merge KV to single record.
-   */
-  public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
-    private BeamSqlRowType outRowType;
-    private List<String> aggFieldNames;
-    private int windowStartFieldIdx;
-
-    public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
-        , int windowStartFieldIdx) {
-      this.outRowType = outRowType;
-      this.aggFieldNames = new ArrayList<>();
-      for (AggregateCall ac : aggList) {
-        aggFieldNames.add(ac.getName());
-      }
-      this.windowStartFieldIdx = windowStartFieldIdx;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
-      BeamSqlRow outRecord = new BeamSqlRow(outRowType);
-      outRecord.updateWindowRange(c.element().getKey(), window);
-
-      KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
-      for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
-        outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
-      }
-      for (int idx = 0; idx < aggFieldNames.size(); ++idx) {
-        outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
-      }
-      if (windowStartFieldIdx != -1) {
-        outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate());
-      }
-
-      c.output(outRecord);
-    }
-  }
-
-  /**
-   * extract group-by fields.
-   */
-  public static class AggregationGroupByKeyFn
-      implements SerializableFunction<BeamSqlRow, BeamSqlRow> {
-    private List<Integer> groupByKeys;
-
-    public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
-      this.groupByKeys = new ArrayList<>();
-      for (int i : groupSet.asList()) {
-        if (i != windowFieldIdx) {
-          groupByKeys.add(i);
-        }
-      }
-    }
-
-    @Override
-    public BeamSqlRow apply(BeamSqlRow input) {
-      BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
-      BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
-      keyOfRecord.updateWindowRange(input, null);
-
-      for (int idx = 0; idx < groupByKeys.size(); ++idx) {
-        keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
-      }
-      return keyOfRecord;
-    }
-
-    private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
-      List<String> fieldNames = new ArrayList<>();
-      List<Integer> fieldTypes = new ArrayList<>();
-      for (int idx : groupByKeys) {
-        fieldNames.add(dataType.getFieldsName().get(idx));
-        fieldTypes.add(dataType.getFieldsType().get(idx));
-      }
-      return BeamSqlRowType.create(fieldNames, fieldTypes);
-    }
-  }
-
-  /**
-   * Assign event timestamp.
-   */
-  public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> {
-    private int windowFieldIdx = -1;
-
-    public WindowTimestampFn(int windowFieldIdx) {
-      super();
-      this.windowFieldIdx = windowFieldIdx;
-    }
-
-    @Override
-    public Instant apply(BeamSqlRow input) {
-      return new Instant(input.getDate(windowFieldIdx).getTime());
-    }
-  }
-
-  /**
-   * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}.
-   */
-  public static class AggregationAdaptor
-    extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
-    private List<BeamSqlUdaf> aggregators;
-    private List<BeamSqlExpression> sourceFieldExps;
-    private BeamSqlRowType finalRowType;
-
-    public AggregationAdaptor(List<AggregateCall> aggregationCalls,
-        BeamSqlRowType sourceRowType) {
-      aggregators = new ArrayList<>();
-      sourceFieldExps = new ArrayList<>();
-      List<String> outFieldsName = new ArrayList<>();
-      List<Integer> outFieldsType = new ArrayList<>();
-      for (AggregateCall call : aggregationCalls) {
-        int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
-        BeamSqlExpression sourceExp = new BeamSqlInputRefExpression(
-            CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex);
-        sourceFieldExps.add(sourceExp);
-
-        outFieldsName.add(call.name);
-        int outFieldType = CalciteUtils.toJavaType(call.type.getSqlTypeName());
-        outFieldsType.add(outFieldType);
-
-        switch (call.getAggregation().getName()) {
-          case "COUNT":
-            aggregators.add(new BeamBuiltinAggregations.Count());
-            break;
-          case "MAX":
-            aggregators.add(BeamBuiltinAggregations.Max.create(call.type.getSqlTypeName()));
-            break;
-          case "MIN":
-            aggregators.add(BeamBuiltinAggregations.Min.create(call.type.getSqlTypeName()));
-            break;
-          case "SUM":
-            aggregators.add(BeamBuiltinAggregations.Sum.create(call.type.getSqlTypeName()));
-            break;
-          case "AVG":
-            aggregators.add(BeamBuiltinAggregations.Avg.create(call.type.getSqlTypeName()));
-            break;
-          default:
-            if (call.getAggregation() instanceof SqlUserDefinedAggFunction) {
-              // handle UDAF.
-              SqlUserDefinedAggFunction udaf = (SqlUserDefinedAggFunction) call.getAggregation();
-              AggregateFunctionImpl fn = (AggregateFunctionImpl) udaf.function;
-              try {
-                aggregators.add((BeamSqlUdaf) fn.declaringClass.newInstance());
-              } catch (Exception e) {
-                throw new IllegalStateException(e);
-              }
-            } else {
-              throw new UnsupportedOperationException(
-                  String.format("Aggregator [%s] is not supported",
-                  call.getAggregation().getName()));
-            }
-          break;
-        }
-      }
-      finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
-    }
-    @Override
-    public AggregationAccumulator createAccumulator() {
-      AggregationAccumulator initialAccu = new AggregationAccumulator();
-      for (BeamSqlUdaf agg : aggregators) {
-        initialAccu.accumulatorElements.add(agg.init());
-      }
-      return initialAccu;
-    }
-    @Override
-    public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) {
-      AggregationAccumulator deltaAcc = new AggregationAccumulator();
-      for (int idx = 0; idx < aggregators.size(); ++idx) {
-        deltaAcc.accumulatorElements.add(
-            aggregators.get(idx).add(accumulator.accumulatorElements.get(idx),
-            sourceFieldExps.get(idx).evaluate(input).getValue()));
-      }
-      return deltaAcc;
-    }
-    @Override
-    public AggregationAccumulator mergeAccumulators(Iterable<AggregationAccumulator> accumulators) {
-      AggregationAccumulator deltaAcc = new AggregationAccumulator();
-      for (int idx = 0; idx < aggregators.size(); ++idx) {
-        List accs = new ArrayList<>();
-        Iterator<AggregationAccumulator> ite = accumulators.iterator();
-        while (ite.hasNext()) {
-          accs.add(ite.next().accumulatorElements.get(idx));
-        }
-        deltaAcc.accumulatorElements.add(aggregators.get(idx).merge(accs));
-      }
-      return deltaAcc;
-    }
-    @Override
-    public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
-      BeamSqlRow result = new BeamSqlRow(finalRowType);
-      for (int idx = 0; idx < aggregators.size(); ++idx) {
-        result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
-      }
-      return result;
-    }
-    @Override
-    public Coder<AggregationAccumulator> getAccumulatorCoder(
-        CoderRegistry registry, Coder<BeamSqlRow> inputCoder)
-        throws CannotProvideCoderException {
-      registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of());
-      List<Coder> aggAccuCoderList = new ArrayList<>();
-      for (BeamSqlUdaf udaf : aggregators) {
-        aggAccuCoderList.add(udaf.getAccumulatorCoder(registry));
-      }
-      return new AggregationAccumulatorCoder(aggAccuCoderList);
-    }
-  }
-
-  /**
-   * A class to holder varied accumulator objects.
-   */
-  public static class AggregationAccumulator{
-    private List accumulatorElements = new ArrayList<>();
-  }
-
-  /**
-   * Coder for {@link AggregationAccumulator}.
-   */
-  public static class AggregationAccumulatorCoder extends CustomCoder<AggregationAccumulator>{
-    private VarIntCoder sizeCoder = VarIntCoder.of();
-    private List<Coder> elementCoders;
-
-    public AggregationAccumulatorCoder(List<Coder> elementCoders) {
-      this.elementCoders = elementCoders;
-    }
-
-    @Override
-    public void encode(AggregationAccumulator value, OutputStream outStream)
-        throws CoderException, IOException {
-      sizeCoder.encode(value.accumulatorElements.size(), outStream);
-      for (int idx = 0; idx < value.accumulatorElements.size(); ++idx) {
-        elementCoders.get(idx).encode(value.accumulatorElements.get(idx), outStream);
-      }
-    }
-
-    @Override
-    public AggregationAccumulator decode(InputStream inStream) throws CoderException, IOException {
-      AggregationAccumulator accu = new AggregationAccumulator();
-      int size = sizeCoder.decode(inStream);
-      for (int idx = 0; idx < size; ++idx) {
-        accu.accumulatorElements.add(elementCoders.get(idx).decode(inStream));
-      }
-      return accu;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java
deleted file mode 100644
index 1183668..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamBuiltinAggregations.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.transform;
-
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.Iterator;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Built-in aggregations functions for COUNT/MAX/MIN/SUM/AVG.
- */
-class BeamBuiltinAggregations {
-  /**
-   * Built-in aggregation for COUNT.
-   */
-  public static final class Count<T> extends BeamSqlUdaf<T, Long, Long> {
-    public Count() {}
-
-    @Override
-    public Long init() {
-      return 0L;
-    }
-
-    @Override
-    public Long add(Long accumulator, T input) {
-      return accumulator + 1;
-    }
-
-    @Override
-    public Long merge(Iterable<Long> accumulators) {
-      long v = 0L;
-      Iterator<Long> ite = accumulators.iterator();
-      while (ite.hasNext()) {
-        v += ite.next();
-      }
-      return v;
-    }
-
-    @Override
-    public Long result(Long accumulator) {
-      return accumulator;
-    }
-  }
-
-  /**
-   * Built-in aggregation for MAX.
-   */
-  public static final class Max<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
-    public static Max create(SqlTypeName fieldType) {
-      switch (fieldType) {
-        case INTEGER:
-          return new BeamBuiltinAggregations.Max<Integer>(fieldType);
-        case SMALLINT:
-          return new BeamBuiltinAggregations.Max<Short>(fieldType);
-        case TINYINT:
-          return new BeamBuiltinAggregations.Max<Byte>(fieldType);
-        case BIGINT:
-          return new BeamBuiltinAggregations.Max<Long>(fieldType);
-        case FLOAT:
-          return new BeamBuiltinAggregations.Max<Float>(fieldType);
-        case DOUBLE:
-          return new BeamBuiltinAggregations.Max<Double>(fieldType);
-        case TIMESTAMP:
-          return new BeamBuiltinAggregations.Max<Date>(fieldType);
-        case DECIMAL:
-          return new BeamBuiltinAggregations.Max<BigDecimal>(fieldType);
-        default:
-          throw new UnsupportedOperationException(
-              String.format("[%s] is not support in MAX", fieldType));
-      }
-    }
-
-    private final SqlTypeName fieldType;
-    private Max(SqlTypeName fieldType) {
-      this.fieldType = fieldType;
-    }
-
-    @Override
-    public T init() {
-      return null;
-    }
-
-    @Override
-    public T add(T accumulator, T input) {
-      return (accumulator == null || accumulator.compareTo(input) < 0) ? input : accumulator;
-    }
-
-    @Override
-    public T merge(Iterable<T> accumulators) {
-      Iterator<T> ite = accumulators.iterator();
-      T mergedV = ite.next();
-      while (ite.hasNext()) {
-        T v = ite.next();
-        mergedV = mergedV.compareTo(v) > 0 ? mergedV : v;
-      }
-      return mergedV;
-    }
-
-    @Override
-    public T result(T accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
-      return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
-    }
-  }
-
-  /**
-   * Built-in aggregation for MIN.
-   */
-  public static final class Min<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
-    public static Min create(SqlTypeName fieldType) {
-      switch (fieldType) {
-        case INTEGER:
-          return new BeamBuiltinAggregations.Min<Integer>(fieldType);
-        case SMALLINT:
-          return new BeamBuiltinAggregations.Min<Short>(fieldType);
-        case TINYINT:
-          return new BeamBuiltinAggregations.Min<Byte>(fieldType);
-        case BIGINT:
-          return new BeamBuiltinAggregations.Min<Long>(fieldType);
-        case FLOAT:
-          return new BeamBuiltinAggregations.Min<Float>(fieldType);
-        case DOUBLE:
-          return new BeamBuiltinAggregations.Min<Double>(fieldType);
-        case TIMESTAMP:
-          return new BeamBuiltinAggregations.Min<Date>(fieldType);
-        case DECIMAL:
-          return new BeamBuiltinAggregations.Min<BigDecimal>(fieldType);
-        default:
-          throw new UnsupportedOperationException(
-              String.format("[%s] is not support in MIN", fieldType));
-      }
-    }
-
-    private final SqlTypeName fieldType;
-    private Min(SqlTypeName fieldType) {
-      this.fieldType = fieldType;
-    }
-
-    @Override
-    public T init() {
-      return null;
-    }
-
-    @Override
-    public T add(T accumulator, T input) {
-      return (accumulator == null || accumulator.compareTo(input) > 0) ? input : accumulator;
-    }
-
-    @Override
-    public T merge(Iterable<T> accumulators) {
-      Iterator<T> ite = accumulators.iterator();
-      T mergedV = ite.next();
-      while (ite.hasNext()) {
-        T v = ite.next();
-        mergedV = mergedV.compareTo(v) < 0 ? mergedV : v;
-      }
-      return mergedV;
-    }
-
-    @Override
-    public T result(T accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
-      return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
-    }
-  }
-
-  /**
-   * Built-in aggregation for SUM.
-   */
-  public static final class Sum<T> extends BeamSqlUdaf<T, BigDecimal, T> {
-    public static Sum create(SqlTypeName fieldType) {
-      switch (fieldType) {
-        case INTEGER:
-          return new BeamBuiltinAggregations.Sum<Integer>(fieldType);
-        case SMALLINT:
-          return new BeamBuiltinAggregations.Sum<Short>(fieldType);
-        case TINYINT:
-          return new BeamBuiltinAggregations.Sum<Byte>(fieldType);
-        case BIGINT:
-          return new BeamBuiltinAggregations.Sum<Long>(fieldType);
-        case FLOAT:
-          return new BeamBuiltinAggregations.Sum<Float>(fieldType);
-        case DOUBLE:
-          return new BeamBuiltinAggregations.Sum<Double>(fieldType);
-        case TIMESTAMP:
-          return new BeamBuiltinAggregations.Sum<Date>(fieldType);
-        case DECIMAL:
-          return new BeamBuiltinAggregations.Sum<BigDecimal>(fieldType);
-        default:
-          throw new UnsupportedOperationException(
-              String.format("[%s] is not support in SUM", fieldType));
-      }
-    }
-
-    private SqlTypeName fieldType;
-      private Sum(SqlTypeName fieldType) {
-        this.fieldType = fieldType;
-      }
-
-    @Override
-    public BigDecimal init() {
-      return new BigDecimal(0);
-    }
-
-    @Override
-    public BigDecimal add(BigDecimal accumulator, T input) {
-      return accumulator.add(new BigDecimal(input.toString()));
-    }
-
-    @Override
-    public BigDecimal merge(Iterable<BigDecimal> accumulators) {
-      BigDecimal v = new BigDecimal(0);
-      Iterator<BigDecimal> ite = accumulators.iterator();
-      while (ite.hasNext()) {
-        v = v.add(ite.next());
-      }
-      return v;
-    }
-
-    @Override
-    public T result(BigDecimal accumulator) {
-      Object result = null;
-      switch (fieldType) {
-        case INTEGER:
-          result = accumulator.intValue();
-          break;
-        case BIGINT:
-          result = accumulator.longValue();
-          break;
-        case SMALLINT:
-          result = accumulator.shortValue();
-          break;
-        case TINYINT:
-          result = accumulator.byteValue();
-          break;
-        case DOUBLE:
-          result = accumulator.doubleValue();
-          break;
-        case FLOAT:
-          result = accumulator.floatValue();
-          break;
-        case DECIMAL:
-          result = accumulator;
-          break;
-        default:
-          break;
-      }
-      return (T) result;
-    }
-  }
-
-  /**
-   * Built-in aggregation for AVG.
-   */
-  public static final class Avg<T> extends BeamSqlUdaf<T, KV<BigDecimal, Long>, T> {
-    public static Avg create(SqlTypeName fieldType) {
-      switch (fieldType) {
-        case INTEGER:
-          return new BeamBuiltinAggregations.Avg<Integer>(fieldType);
-        case SMALLINT:
-          return new BeamBuiltinAggregations.Avg<Short>(fieldType);
-        case TINYINT:
-          return new BeamBuiltinAggregations.Avg<Byte>(fieldType);
-        case BIGINT:
-          return new BeamBuiltinAggregations.Avg<Long>(fieldType);
-        case FLOAT:
-          return new BeamBuiltinAggregations.Avg<Float>(fieldType);
-        case DOUBLE:
-          return new BeamBuiltinAggregations.Avg<Double>(fieldType);
-        case TIMESTAMP:
-          return new BeamBuiltinAggregations.Avg<Date>(fieldType);
-        case DECIMAL:
-          return new BeamBuiltinAggregations.Avg<BigDecimal>(fieldType);
-        default:
-          throw new UnsupportedOperationException(
-              String.format("[%s] is not support in AVG", fieldType));
-      }
-    }
-
-    private SqlTypeName fieldType;
-      private Avg(SqlTypeName fieldType) {
-        this.fieldType = fieldType;
-      }
-
-    @Override
-    public KV<BigDecimal, Long> init() {
-      return KV.of(new BigDecimal(0), 0L);
-    }
-
-    @Override
-    public KV<BigDecimal, Long> add(KV<BigDecimal, Long> accumulator, T input) {
-      return KV.of(
-              accumulator.getKey().add(new BigDecimal(input.toString())),
-              accumulator.getValue() + 1);
-    }
-
-    @Override
-    public KV<BigDecimal, Long> merge(Iterable<KV<BigDecimal, Long>> accumulators) {
-      BigDecimal v = new BigDecimal(0);
-      long s = 0;
-      Iterator<KV<BigDecimal, Long>> ite = accumulators.iterator();
-      while (ite.hasNext()) {
-        KV<BigDecimal, Long> r = ite.next();
-        v = v.add(r.getKey());
-        s += r.getValue();
-      }
-      return KV.of(v, s);
-    }
-
-    @Override
-    public T result(KV<BigDecimal, Long> accumulator) {
-      BigDecimal decimalAvg = accumulator.getKey().divide(
-          new BigDecimal(accumulator.getValue()));
-      Object result = null;
-      switch (fieldType) {
-        case INTEGER:
-          result = decimalAvg.intValue();
-          break;
-        case BIGINT:
-          result = decimalAvg.longValue();
-          break;
-        case SMALLINT:
-          result = decimalAvg.shortValue();
-          break;
-        case TINYINT:
-          result = decimalAvg.byteValue();
-          break;
-        case DOUBLE:
-          result = decimalAvg.doubleValue();
-          break;
-        case FLOAT:
-          result = decimalAvg.floatValue();
-          break;
-        case DECIMAL:
-          result = decimalAvg;
-          break;
-        default:
-          break;
-      }
-      return (T) result;
-    }
-
-    @Override
-    public Coder<KV<BigDecimal, Long>> getAccumulatorCoder(CoderRegistry registry)
-        throws CannotProvideCoderException {
-      return KvCoder.of(BigDecimalCoder.of(), VarLongCoder.of());
-    }
-  }
-
-  /**
-   * Find {@link Coder} for Beam SQL field types.
-   */
-  private static Coder getSqlTypeCoder(SqlTypeName sqlType) {
-    switch (sqlType) {
-      case INTEGER:
-        return VarIntCoder.of();
-      case SMALLINT:
-        return SerializableCoder.of(Short.class);
-      case TINYINT:
-        return ByteCoder.of();
-      case BIGINT:
-        return VarLongCoder.of();
-      case FLOAT:
-        return SerializableCoder.of(Float.class);
-      case DOUBLE:
-        return DoubleCoder.of();
-      case TIMESTAMP:
-        return SerializableCoder.of(Date.class);
-      case DECIMAL:
-        return BigDecimalCoder.of();
-      default:
-        throw new UnsupportedOperationException(
-            String.format("Cannot find a Coder for data type [%s]", sqlType));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java
deleted file mode 100644
index d819421..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamJoinTransforms.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.transform;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.util.Pair;
-
-/**
- * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation.
- */
-public class BeamJoinTransforms {
-
-  /**
-   * A {@code SimpleFunction} to extract join fields from the specified row.
-   */
-  public static class ExtractJoinFields
-      extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
-    private final boolean isLeft;
-    private final List<Pair<Integer, Integer>> joinColumns;
-
-    public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) {
-      this.isLeft = isLeft;
-      this.joinColumns = joinColumns;
-    }
-
-    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
-      // build the type
-      // the name of the join field is not important
-      List<String> names = new ArrayList<>(joinColumns.size());
-      List<Integer> types = new ArrayList<>(joinColumns.size());
-      for (int i = 0; i < joinColumns.size(); i++) {
-        names.add("c" + i);
-        types.add(isLeft
-            ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
-            input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
-      }
-      BeamSqlRowType type = BeamSqlRowType.create(names, types);
-
-      // build the row
-      BeamSqlRow row = new BeamSqlRow(type);
-      for (int i = 0; i < joinColumns.size(); i++) {
-        row.addField(i, input
-            .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
-      }
-      return KV.of(row, input);
-    }
-  }
-
-
-  /**
-   * A {@code DoFn} which implement the sideInput-JOIN.
-   */
-  public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
-    private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
-    private final JoinRelType joinType;
-    private final BeamSqlRow rightNullRow;
-    private final boolean swap;
-
-    public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
-        PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
-        boolean swap) {
-      this.joinType = joinType;
-      this.rightNullRow = rightNullRow;
-      this.sideInputView = sideInputView;
-      this.swap = swap;
-    }
-
-    @ProcessElement public void processElement(ProcessContext context) {
-      BeamSqlRow key = context.element().getKey();
-      BeamSqlRow leftRow = context.element().getValue();
-      Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
-      Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
-
-      if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
-        Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
-        while (it.hasNext()) {
-          context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
-        }
-      } else {
-        if (joinType == JoinRelType.LEFT) {
-          context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap));
-        }
-      }
-    }
-  }
-
-
-  /**
-   * A {@code SimpleFunction} to combine two rows into one.
-   */
-  public static class JoinParts2WholeRow
-      extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
-    @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
-      KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
-      BeamSqlRow leftRow = parts.getKey();
-      BeamSqlRow rightRow = parts.getValue();
-      return combineTwoRowsIntoOne(leftRow, rightRow, false);
-    }
-  }
-
-  /**
-   * As the method name suggests: combine two rows into one wide row.
-   */
-  private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
-      BeamSqlRow rightRow, boolean swap) {
-    if (swap) {
-      return combineTwoRowsIntoOneHelper(rightRow, leftRow);
-    } else {
-      return combineTwoRowsIntoOneHelper(leftRow, rightRow);
-    }
-  }
-
-  /**
-   * As the method name suggests: combine two rows into one wide row.
-   */
-  private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
-      BeamSqlRow rightRow) {
-    // build the type
-    List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
-    names.addAll(leftRow.getDataType().getFieldsName());
-    names.addAll(rightRow.getDataType().getFieldsName());
-
-    List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
-    types.addAll(leftRow.getDataType().getFieldsType());
-    types.addAll(rightRow.getDataType().getFieldsType());
-    BeamSqlRowType type = BeamSqlRowType.create(names, types);
-
-    BeamSqlRow row = new BeamSqlRow(type);
-    // build the row
-    for (int i = 0; i < leftRow.size(); i++) {
-      row.addField(i, leftRow.getFieldValue(i));
-    }
-
-    for (int i = 0; i < rightRow.size(); i++) {
-      row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
-    }
-
-    return row;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java
deleted file mode 100644
index 8546160..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSetOperatorsTransforms.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.transform;
-
-import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.rel.BeamSetOperatorRelBase;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations.
- */
-public abstract class BeamSetOperatorsTransforms {
-  /**
-   * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
-   */
-  public static class BeamSqlRow2KvFn extends
-      SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
-    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
-      return KV.of(input, input);
-    }
-  }
-
-  /**
-   * Filter function used for Set operators.
-   */
-  public static class SetOperatorFilteringDoFn extends
-      DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
-    private TupleTag<BeamSqlRow> leftTag;
-    private TupleTag<BeamSqlRow> rightTag;
-    private BeamSetOperatorRelBase.OpType opType;
-    // ALL?
-    private boolean all;
-
-    public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
-        BeamSetOperatorRelBase.OpType opType, boolean all) {
-      this.leftTag = leftTag;
-      this.rightTag = rightTag;
-      this.opType = opType;
-      this.all = all;
-    }
-
-    @ProcessElement public void processElement(ProcessContext ctx) {
-      CoGbkResult coGbkResult = ctx.element().getValue();
-      Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
-      Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
-      switch (opType) {
-        case UNION:
-          if (all) {
-            // output both left & right
-            Iterator<BeamSqlRow> iter = leftRows.iterator();
-            while (iter.hasNext()) {
-              ctx.output(iter.next());
-            }
-            iter = rightRows.iterator();
-            while (iter.hasNext()) {
-              ctx.output(iter.next());
-            }
-          } else {
-            // only output the key
-            ctx.output(ctx.element().getKey());
-          }
-          break;
-        case INTERSECT:
-          if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
-            if (all) {
-              for (BeamSqlRow leftRow : leftRows) {
-                ctx.output(leftRow);
-              }
-            } else {
-              ctx.output(ctx.element().getKey());
-            }
-          }
-          break;
-        case MINUS:
-          if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
-            Iterator<BeamSqlRow> iter = leftRows.iterator();
-            if (all) {
-              // output all
-              while (iter.hasNext()) {
-                ctx.output(iter.next());
-              }
-            } else {
-              // only output one
-              ctx.output(iter.next());
-            }
-          }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java
deleted file mode 100644
index 372c38c..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlFilterFn.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.transform;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step.
- *
- */
-public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
-
-  private String stepName;
-  private BeamSqlExpressionExecutor executor;
-
-  public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) {
-    super();
-    this.stepName = stepName;
-    this.executor = executor;
-  }
-
-  @Setup
-  public void setup() {
-    executor.prepare();
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-    BeamSqlRow in = c.element();
-
-    List<Object> result = executor.execute(in);
-
-    if ((Boolean) result.get(0)) {
-      c.output(in);
-    }
-  }
-
-  @Teardown
-  public void close() {
-    executor.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java
deleted file mode 100644
index 9221947..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlOutputToConsoleFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.transform;
-
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A test PTransform to display output in console.
- *
- */
-public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> {
-
-  private String stepName;
-
-  public BeamSqlOutputToConsoleFn(String stepName) {
-    super();
-    this.stepName = stepName;
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-    System.out.println("Output: " + c.element().getDataValues());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java
deleted file mode 100644
index af398ea..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/BeamSqlProjectFn.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.transform;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-
-/**
- *
- * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step.
- *
- */
-public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
-  private String stepName;
-  private BeamSqlExpressionExecutor executor;
-  private BeamSqlRowType outputRowType;
-
-  public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
-      BeamSqlRowType outputRowType) {
-    super();
-    this.stepName = stepName;
-    this.executor = executor;
-    this.outputRowType = outputRowType;
-  }
-
-  @Setup
-  public void setup() {
-    executor.prepare();
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c, BoundedWindow window) {
-    BeamSqlRow inputRow = c.element();
-    List<Object> results = executor.execute(inputRow);
-
-    BeamSqlRow outRow = new BeamSqlRow(outputRowType);
-    outRow.updateWindowRange(inputRow, window);
-
-    for (int idx = 0; idx < results.size(); ++idx) {
-      BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
-    }
-
-    c.output(outRow);
-  }
-
-  @Teardown
-  public void close() {
-    executor.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java
deleted file mode 100644
index 7797ddf..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/transform/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql pipeline.
- */
-package org.apache.beam.sdk.extensions.sql.transform;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java
deleted file mode 100644
index 9970955..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/CalciteUtils.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.utils;
-
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Utility methods for Calcite related operations.
- */
-public class CalciteUtils {
-  private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>();
-  private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>();
-  static {
-    JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT);
-    JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT);
-    JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER);
-    JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT);
-
-    JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT);
-    JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE);
-
-    JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL);
-
-    JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
-    JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
-
-    JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE);
-    JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
-    JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
-
-    JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN);
-
-    for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) {
-      CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey());
-    }
-  }
-
-  /**
-   * Get the corresponding {@code SqlTypeName} for an integer sql type.
-   */
-  public static SqlTypeName toCalciteType(int type) {
-    return JAVA_TO_CALCITE_MAPPING.get(type);
-  }
-
-  /**
-   * Get the integer sql type from Calcite {@code SqlTypeName}.
-   */
-  public static Integer toJavaType(SqlTypeName typeName) {
-    return CALCITE_TO_JAVA_MAPPING.get(typeName);
-  }
-
-  /**
-   * Get the {@code SqlTypeName} for the specified column of a table.
-   */
-  public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
-    return toCalciteType(schema.getFieldsType().get(index));
-  }
-
-  /**
-   * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
-   */
-  public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
-    List<String> fieldNames = new ArrayList<>();
-    List<Integer> fieldTypes = new ArrayList<>();
-    for (RelDataTypeField f : tableInfo.getFieldList()) {
-      fieldNames.add(f.getName());
-      fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
-    }
-    return BeamSqlRowType.create(fieldNames, fieldTypes);
-  }
-
-  /**
-   * Create an instance of {@code RelDataType} so it can be used to create a table.
-   */
-  public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
-    return new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a) {
-        RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
-        for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
-          builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
-        }
-        return builder.build();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java
deleted file mode 100644
index e4d6148..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Utility classes.
- */
-package org.apache.beam.sdk.extensions.sql.utils;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index 363ab8f..d75af9b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.sdk.extensions.sql;
 
-import static org.apache.beam.sdk.extensions.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
-import static org.apache.beam.sdk.extensions.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
+import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
 
 import java.sql.Types;
 import java.util.Arrays;