You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:30 UTC
[07/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
new file mode 100644
index 0000000..ba344df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Sort} node.
+ *
+ * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement
+ * the {@code Sort} algebra. The following types of ORDER BY are supported:
+
+ * <pre>{@code
+ * select * from t order by id desc limit 10;
+ * select * from t order by id desc limit 10, 5;
+ * }</pre>
+ *
+ * <p>but Order BY without a limit is NOT supported:
+ *
+ * <pre>{@code
+ * select * from t order by id desc
+ * }</pre>
+ *
+ * <h3>Constraints</h3>
+ * <ul>
+ * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT`
+ * must fit into the memory of a single machine.</li>
+ * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`,
+ * it does not make much sense to use `ORDER BY` with `WINDOW`.
+ * </li>
+ * </ul>
+ */
+public class BeamSortRel extends Sort implements BeamRelNode {
+ private List<Integer> fieldIndices = new ArrayList<>();
+ private List<Boolean> orientation = new ArrayList<>();
+ private List<Boolean> nullsFirst = new ArrayList<>();
+
+ private int startIndex = 0;
+ private int count;
+
+ public BeamSortRel(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode child,
+ RelCollation collation,
+ RexNode offset,
+ RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+
+ List<RexNode> fieldExps = getChildExps();
+ RelCollationImpl collationImpl = (RelCollationImpl) collation;
+ List<RelFieldCollation> collations = collationImpl.getFieldCollations();
+ for (int i = 0; i < fieldExps.size(); i++) {
+ RexNode fieldExp = fieldExps.get(i);
+ RexInputRef inputRef = (RexInputRef) fieldExp;
+ fieldIndices.add(inputRef.getIndex());
+ orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
+
+ RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
+ if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
+ rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
+ }
+ nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
+ }
+
+ if (fetch == null) {
+ throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
+ }
+
+ RexLiteral fetchLiteral = (RexLiteral) fetch;
+ count = ((BigDecimal) fetchLiteral.getValue()).intValue();
+
+ if (offset != null) {
+ RexLiteral offsetLiteral = (RexLiteral) offset;
+ startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
+ }
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ RelNode input = getInput();
+ PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
+ .buildBeamPipeline(inputPCollections, sqlEnv);
+ Type windowType = upstream.getWindowingStrategy().getWindowFn()
+ .getWindowTypeDescriptor().getType();
+ if (!windowType.equals(GlobalWindow.class)) {
+ throw new UnsupportedOperationException(
+ "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType);
+ }
+
+ BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
+ nullsFirst);
+ // first find the top (offset + count)
+ PCollection<List<BeamSqlRow>> rawStream =
+ upstream.apply("extractTopOffsetAndFetch",
+ Top.of(startIndex + count, comparator).withoutDefaults())
+ .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+
+ // strip the `leading offset`
+ if (startIndex > 0) {
+ rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
+ new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
+ .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
+ }
+
+ PCollection<BeamSqlRow> orderedStream = rawStream.apply(
+ "flatten", Flatten.<BeamSqlRow>iterables());
+ orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+ return orderedStream;
+ }
+
+ private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
+ private int startIndex;
+ private int endIndex;
+
+ public SubListFn(int startIndex, int endIndex) {
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ ctx.output(ctx.element().subList(startIndex, endIndex));
+ }
+ }
+
+ @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
+ RexNode offset, RexNode fetch) {
+ return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
+ private List<Integer> fieldsIndices;
+ private List<Boolean> orientation;
+ private List<Boolean> nullsFirst;
+
+ public BeamSqlRowComparator(List<Integer> fieldsIndices,
+ List<Boolean> orientation,
+ List<Boolean> nullsFirst) {
+ this.fieldsIndices = fieldsIndices;
+ this.orientation = orientation;
+ this.nullsFirst = nullsFirst;
+ }
+
+ @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
+ for (int i = 0; i < fieldsIndices.size(); i++) {
+ int fieldIndex = fieldsIndices.get(i);
+ int fieldRet = 0;
+ SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
+ // whether NULL should be ordered first or last(compared to non-null values) depends on
+ // what user specified in SQL(NULLS FIRST/NULLS LAST)
+ if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
+ continue;
+ } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) {
+ fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1);
+ } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
+ fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1);
+ } else {
+ switch (fieldType) {
+ case TINYINT:
+ fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
+ break;
+ case SMALLINT:
+ fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
+ break;
+ case INTEGER:
+ fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
+ break;
+ case BIGINT:
+ fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
+ break;
+ case FLOAT:
+ fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
+ break;
+ case DOUBLE:
+ fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
+ break;
+ case VARCHAR:
+ fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
+ break;
+ case DATE:
+ fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Data type: " + fieldType + " not supported yet!");
+ }
+ }
+
+ fieldRet *= (orientation.get(i) ? -1 : 1);
+ if (fieldRet != 0) {
+ return fieldRet;
+ }
+ }
+ return 0;
+ }
+ }
+
+ public static <T extends Number & Comparable> int numberCompare(T a, T b) {
+ return a.compareTo(b);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
new file mode 100644
index 0000000..9f1f703
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for {@code BeamRelNode}.
+ */
+class BeamSqlRelUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
+
+ private static final AtomicInteger sequence = new AtomicInteger(0);
+ private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+ public static String getStageName(BeamRelNode relNode) {
+ return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+ + sequence.getAndIncrement();
+ }
+
+ public static String getClassName(BeamRelNode relNode) {
+ return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+ + "_" + classSequence.getAndIncrement();
+ }
+
+ public static BeamRelNode getBeamRelInput(RelNode input) {
+ if (input instanceof RelSubset) {
+ // go with known best input
+ input = ((RelSubset) input).getBest();
+ }
+ return (BeamRelNode) input;
+ }
+
+ public static String explain(final RelNode rel) {
+ return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ }
+
+ public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+ String explain = "";
+ try {
+ explain = RelOptUtil.toString(rel);
+ } catch (StackOverflowError e) {
+ LOG.error("StackOverflowError occurred while extracting plan. "
+ + "Please report it to the dev@ mailing list.");
+ LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+ LOG.error("Forcing plan to empty string and continue... "
+ + "SQL Runner may not working properly after.");
+ }
+ return explain;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
new file mode 100644
index 0000000..c661585
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Union}.
+ *
+ * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
+ * perspective, two cases are supported:
+ *
+ * <p>1) Do not use {@code grouped window function}:
+ *
+ * <pre>{@code
+ * select * from person UNION select * from person
+ * }</pre>
+ *
+ * <p>2) Use the same {@code grouped window function}, with the same param:
+ * <pre>{@code
+ * select id, count(*) from person
+ * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * UNION
+ * select * from person
+ * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * }</pre>
+ *
+ * <p>Inputs with different group functions are NOT supported:
+ * <pre>{@code
+ * select id, count(*) from person
+ * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * UNION
+ * select * from person
+ * group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
+ * }</pre>
+ */
+public class BeamUnionRel extends Union implements BeamRelNode {
+ private BeamSetOperatorRelBase delegate;
+ public BeamUnionRel(RelOptCluster cluster,
+ RelTraitSet traits,
+ List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ this.delegate = new BeamSetOperatorRelBase(this,
+ BeamSetOperatorRelBase.OpType.UNION,
+ inputs, all);
+ }
+
+ public BeamUnionRel(RelInput input) {
+ super(input);
+ }
+
+ @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new BeamUnionRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
new file mode 100644
index 0000000..43b74c3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.schema.BeamTableUtils;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Values} node.
+ *
+ * <p>{@code BeamValuesRel} will be used in the following SQLs:
+ * <ul>
+ * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li>
+ * <li>{@code select 1, '1', LOCALTIME}</li>
+ * </ul>
+ */
+public class BeamValuesRel extends Values implements BeamRelNode {
+
+ public BeamValuesRel(
+ RelOptCluster cluster,
+ RelDataType rowType,
+ ImmutableList<ImmutableList<RexLiteral>> tuples,
+ RelTraitSet traits) {
+ super(cluster, rowType, tuples, traits);
+
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
+ String stageName = BeamSqlRelUtils.getStageName(this);
+ if (tuples.isEmpty()) {
+ throw new IllegalStateException("Values with empty tuples!");
+ }
+
+ BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
+ for (ImmutableList<RexLiteral> tuple : tuples) {
+ BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
+ for (int i = 0; i < tuple.size(); i++) {
+ BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
+ }
+ rows.add(row);
+ }
+
+ return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
+ .setCoder(new BeamSqlRowCoder(beamSQLRowType));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
new file mode 100644
index 0000000..77d6204
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
+ *
+ */
+package org.apache.beam.dsls.sql.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
new file mode 100644
index 0000000..6e843d4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.dsls.sql.rel.BeamAggregationRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Duration;
+
+/**
+ * Rule to detect the window/trigger settings.
+ *
+ */
+public class BeamAggregationRule extends RelOptRule {
+ public static final BeamAggregationRule INSTANCE =
+ new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+ public BeamAggregationRule(
+ Class<? extends Aggregate> aggregateClass,
+ Class<? extends Project> projectClass,
+ RelBuilderFactory relBuilderFactory) {
+ super(
+ operand(aggregateClass,
+ operand(projectClass, any())),
+ relBuilderFactory, null);
+ }
+
+ public BeamAggregationRule(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate aggregate = call.rel(0);
+ final Project project = call.rel(1);
+ updateWindowTrigger(call, aggregate, project);
+ }
+
+ private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
+ Project project) {
+ ImmutableBitSet groupByFields = aggregate.getGroupSet();
+ List<RexNode> projectMapping = project.getProjects();
+
+ WindowFn windowFn = new GlobalWindows();
+ Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
+ int windowFieldIdx = -1;
+ Duration allowedLatence = Duration.ZERO;
+
+ for (int groupField : groupByFields.asList()) {
+ RexNode projNode = projectMapping.get(groupField);
+ if (projNode instanceof RexCall) {
+ SqlOperator op = ((RexCall) projNode).op;
+ ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+ String functionName = op.getName();
+ switch (functionName) {
+ case "TUMBLE":
+ windowFieldIdx = groupField;
+ windowFn = FixedWindows
+ .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "HOP":
+ windowFieldIdx = groupField;
+ windowFn = SlidingWindows
+ .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
+ .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
+ if (parameters.size() == 4) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "SESSION":
+ windowFieldIdx = groupField;
+ windowFn = Sessions
+ .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
+ aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(aggregate.getInput(),
+ aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList(),
+ windowFn,
+ triggerFn,
+ windowFieldIdx,
+ allowedLatence);
+ call.transformTo(newAggregator);
+ }
+
+ private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+ return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
+ .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
+ }
+
+ private long getWindowParameterAsMillis(RexNode parameterNode) {
+ if (parameterNode instanceof RexLiteral) {
+ return RexLiteral.intValue(parameterNode);
+ } else {
+ throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
new file mode 100644
index 0000000..414b666
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+
+/**
+ * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
+ *
+ */
+public class BeamFilterRule extends ConverterRule {
+ public static final BeamFilterRule INSTANCE = new BeamFilterRule();
+
+ private BeamFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ return new BeamFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
new file mode 100644
index 0000000..4cc4ef5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rule;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.rel.BeamIOSinkRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableModify} with
+ * {@link BeamIOSinkRel}.
+ *
+ */
+public class BeamIOSinkRule extends ConverterRule {
+ public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
+
+ private BeamIOSinkRule() {
+ super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSinkRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input,
+ input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List<String> updateColumnList = tableModify.getUpdateColumnList();
+ final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case TABLE:
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(
+ String.format("Streams doesn't support %s modify operation", operation));
+ }
+ return new BeamIOSinkRel(cluster, traitSet,
+ relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
new file mode 100644
index 0000000..85a69ff
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamIOSourceRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableScan} with
+ * {@link BeamIOSourceRel}.
+ *
+ */
+public class BeamIOSourceRule extends ConverterRule {
+ public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
+
+ private BeamIOSourceRule() {
+ super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSourceRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+
+ return new BeamIOSourceRel(scan.getCluster(),
+ scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
new file mode 100644
index 0000000..70716c5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.rel.BeamIntersectRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+
+/**
+ * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRule extends ConverterRule {
+ public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
+ private BeamIntersectRule() {
+ super(LogicalIntersect.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Intersect intersect = (Intersect) rel;
+ final List<RelNode> inputs = intersect.getInputs();
+ return new BeamIntersectRel(
+ intersect.getCluster(),
+ intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(inputs, BeamLogicalConvention.INSTANCE),
+ intersect.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
new file mode 100644
index 0000000..78253fe
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamJoinRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
+ */
+public class BeamJoinRule extends ConverterRule {
+ public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+ private BeamJoinRule() {
+ super(LogicalJoin.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamJoinRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Join join = (Join) rel;
+ return new BeamJoinRel(
+ join.getCluster(),
+ join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(join.getLeft(),
+ join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ convert(join.getRight(),
+ join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ join.getCondition(),
+ join.getVariablesSet(),
+ join.getJoinType()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
new file mode 100644
index 0000000..ca93c71
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamMinusRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+
+/**
+ * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
+ */
+public class BeamMinusRule extends ConverterRule {
+ public static final BeamMinusRule INSTANCE = new BeamMinusRule();
+ private BeamMinusRule() {
+ super(LogicalMinus.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamMinusRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Minus minus = (Minus) rel;
+ final List<RelNode> inputs = minus.getInputs();
+ return new BeamMinusRel(
+ minus.getCluster(),
+ minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(inputs, BeamLogicalConvention.INSTANCE),
+ minus.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
new file mode 100644
index 0000000..6dc3b57
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+
+/**
+ * A {@code ConverterRule} to replace {@link Project} with
+ * {@link BeamProjectRel}.
+ *
+ */
+public class BeamProjectRule extends ConverterRule {
+ public static final BeamProjectRule INSTANCE = new BeamProjectRule();
+
+ private BeamProjectRule() {
+ super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new BeamProjectRel(project.getCluster(),
+ project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ project.getProjects(), project.getRowType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
new file mode 100644
index 0000000..d802e9d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+
+import org.apache.beam.dsls.sql.rel.BeamSortRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalSort;
+
+/**
+ * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}.
+ */
+public class BeamSortRule extends ConverterRule {
+ public static final BeamSortRule INSTANCE = new BeamSortRule();
+ private BeamSortRule() {
+ super(LogicalSort.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamSortRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Sort sort = (Sort) rel;
+ final RelNode input = sort.getInput();
+ return new BeamSortRel(
+ sort.getCluster(),
+ sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
new file mode 100644
index 0000000..b8430b9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamUnionRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+
+/**
+ * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
+ * {@link BeamUnionRule}.
+ */
+public class BeamUnionRule extends ConverterRule {
+ public static final BeamUnionRule INSTANCE = new BeamUnionRule();
+ private BeamUnionRule() {
+ super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamUnionRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Union union = (Union) rel;
+
+ return new BeamUnionRel(
+ union.getCluster(),
+ union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
+ union.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
new file mode 100644
index 0000000..4ea9e60
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamValuesRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalValues;
+
+/**
+ * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}.
+ */
+public class BeamValuesRule extends ConverterRule {
+ public static final BeamValuesRule INSTANCE = new BeamValuesRule();
+ private BeamValuesRule() {
+ super(LogicalValues.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamValuesRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Values values = (Values) rel;
+ return new BeamValuesRel(
+ values.getCluster(),
+ values.getRowType(),
+ values.getTuples(),
+ values.getTraitSet().replace(BeamLogicalConvention.INSTANCE)
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
new file mode 100644
index 0000000..5d32647
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.calcite.plan.RelOptRule} to generate
+ * {@link org.apache.beam.dsls.sql.rel.BeamRelNode}.
+ */
+package org.apache.beam.dsls.sql.rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
new file mode 100644
index 0000000..dfa2785
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
+ protected BeamSqlRowType beamSqlRowType;
+ public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
+ }
+
+ @Override public BeamSqlRowType getRowType() {
+ return beamSqlRowType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
new file mode 100644
index 0000000..502e8c1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+ BOUNDED, UNBOUNDED;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
new file mode 100644
index 0000000..5b63780
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
+ * then a downstream query can query directly.
+ */
+public class BeamPCollectionTable extends BaseBeamTable {
+ private BeamIOType ioType;
+ private transient PCollection<BeamSqlRow> upstream;
+
+ protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
+ }
+
+ public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
+ BeamSqlRowType beamSqlRowType){
+ this(beamSqlRowType);
+ ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
+ ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
+ this.upstream = upstream;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return ioType;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ return upstream;
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
new file mode 100644
index 0000000..d789446
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Instant;
+
+/**
+ * Represent a generic ROW record in Beam SQL.
+ *
+ */
+public class BeamSqlRow implements Serializable {
+ private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+ static {
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+ }
+
+ private List<Integer> nullFields = new ArrayList<>();
+ private List<Object> dataValues;
+ private BeamSqlRowType dataType;
+
+ private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+ private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+
+ public BeamSqlRow(BeamSqlRowType dataType) {
+ this.dataType = dataType;
+ this.dataValues = new ArrayList<>();
+ for (int idx = 0; idx < dataType.size(); ++idx) {
+ dataValues.add(null);
+ nullFields.add(idx);
+ }
+ }
+
+ public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
+ this(dataType);
+ for (int idx = 0; idx < dataValues.size(); ++idx) {
+ addField(idx, dataValues.get(idx));
+ }
+ }
+
+ public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){
+ windowStart = upstreamRecord.windowStart;
+ windowEnd = upstreamRecord.windowEnd;
+
+ if (window instanceof IntervalWindow) {
+ IntervalWindow iWindow = (IntervalWindow) window;
+ windowStart = iWindow.start();
+ windowEnd = iWindow.end();
+ }
+ }
+
+ public void addField(String fieldName, Object fieldValue) {
+ addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+ }
+
+ public void addField(int index, Object fieldValue) {
+ if (fieldValue == null) {
+ return;
+ } else {
+ if (nullFields.contains(index)) {
+ nullFields.remove(nullFields.indexOf(index));
+ }
+ }
+
+ validateValueType(index, fieldValue);
+ dataValues.set(index, fieldValue);
+ }
+
+ private void validateValueType(int index, Object fieldValue) {
+ SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
+ Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
+ if (javaClazz == null) {
+ throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
+ }
+
+ if (!fieldValue.getClass().equals(javaClazz)) {
+ throw new IllegalArgumentException(
+ String.format("[%s](%s) doesn't match type [%s]",
+ fieldValue, fieldValue.getClass(), fieldType)
+ );
+ }
+ }
+
+ public Object getFieldValue(String fieldName) {
+ return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+ }
+
+ public byte getByte(String fieldName) {
+ return (Byte) getFieldValue(fieldName);
+ }
+
+ public short getShort(String fieldName) {
+ return (Short) getFieldValue(fieldName);
+ }
+
+ public int getInteger(String fieldName) {
+ return (Integer) getFieldValue(fieldName);
+ }
+
+ public float getFloat(String fieldName) {
+ return (Float) getFieldValue(fieldName);
+ }
+
+ public double getDouble(String fieldName) {
+ return (Double) getFieldValue(fieldName);
+ }
+
+ public long getLong(String fieldName) {
+ return (Long) getFieldValue(fieldName);
+ }
+
+ public String getString(String fieldName) {
+ return (String) getFieldValue(fieldName);
+ }
+
+ public Date getDate(String fieldName) {
+ return (Date) getFieldValue(fieldName);
+ }
+
+ public GregorianCalendar getGregorianCalendar(String fieldName) {
+ return (GregorianCalendar) getFieldValue(fieldName);
+ }
+
+ public BigDecimal getBigDecimal(String fieldName) {
+ return (BigDecimal) getFieldValue(fieldName);
+ }
+
+ public boolean getBoolean(String fieldName) {
+ return (boolean) getFieldValue(fieldName);
+ }
+
+ public Object getFieldValue(int fieldIdx) {
+ if (nullFields.contains(fieldIdx)) {
+ return null;
+ }
+
+ return dataValues.get(fieldIdx);
+ }
+
+ public byte getByte(int idx) {
+ return (Byte) getFieldValue(idx);
+ }
+
+ public short getShort(int idx) {
+ return (Short) getFieldValue(idx);
+ }
+
+ public int getInteger(int idx) {
+ return (Integer) getFieldValue(idx);
+ }
+
+ public float getFloat(int idx) {
+ return (Float) getFieldValue(idx);
+ }
+
+ public double getDouble(int idx) {
+ return (Double) getFieldValue(idx);
+ }
+
+ public long getLong(int idx) {
+ return (Long) getFieldValue(idx);
+ }
+
+ public String getString(int idx) {
+ return (String) getFieldValue(idx);
+ }
+
+ public Date getDate(int idx) {
+ return (Date) getFieldValue(idx);
+ }
+
+ public GregorianCalendar getGregorianCalendar(int idx) {
+ return (GregorianCalendar) getFieldValue(idx);
+ }
+
+ public BigDecimal getBigDecimal(int idx) {
+ return (BigDecimal) getFieldValue(idx);
+ }
+
+ public boolean getBoolean(int idx) {
+ return (boolean) getFieldValue(idx);
+ }
+
+ public int size() {
+ return dataValues.size();
+ }
+
+ public List<Object> getDataValues() {
+ return dataValues;
+ }
+
+ public void setDataValues(List<Object> dataValues) {
+ this.dataValues = dataValues;
+ }
+
+ public BeamSqlRowType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(BeamSqlRowType dataType) {
+ this.dataType = dataType;
+ }
+
+ public void setNullFields(List<Integer> nullFields) {
+ this.nullFields = nullFields;
+ }
+
+ public List<Integer> getNullFields() {
+ return nullFields;
+ }
+
+ /**
+ * is the specified field NULL?
+ */
+ public boolean isNull(int idx) {
+ return nullFields.contains(idx);
+ }
+
+ public Instant getWindowStart() {
+ return windowStart;
+ }
+
+ public Instant getWindowEnd() {
+ return windowEnd;
+ }
+
+ public void setWindowStart(Instant windowStart) {
+ this.windowStart = windowStart;
+ }
+
+ public void setWindowEnd(Instant windowEnd) {
+ this.windowEnd = windowEnd;
+ }
+
+ @Override
+ public String toString() {
+ return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
+ + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
+ }
+
+ /**
+ * Return data fields as key=value.
+ */
+ public String valueInString() {
+ StringBuilder sb = new StringBuilder();
+ for (int idx = 0; idx < size(); ++idx) {
+ sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
+ }
+ return sb.substring(1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BeamSqlRow other = (BeamSqlRow) obj;
+ return toString().equals(other.toString());
+ }
+
+ @Override public int hashCode() {
+ return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
new file mode 100644
index 0000000..f14864a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A {@link Coder} encodes {@link BeamSqlRow}.
+ */
+public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
+ private BeamSqlRowType tableSchema;
+
+ private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
+
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final DoubleCoder doubleCoder = DoubleCoder.of();
+ private static final InstantCoder instantCoder = InstantCoder.of();
+ private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
+ private static final ByteCoder byteCoder = ByteCoder.of();
+
+ public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override
+ public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException {
+ listCoder.encode(value.getNullFields(), outStream);
+ for (int idx = 0; idx < value.size(); ++idx) {
+ if (value.getNullFields().contains(idx)) {
+ continue;
+ }
+
+ switch (CalciteUtils.getFieldType(value.getDataType(), idx)) {
+ case INTEGER:
+ intCoder.encode(value.getInteger(idx), outStream);
+ break;
+ case SMALLINT:
+ intCoder.encode((int) value.getShort(idx), outStream);
+ break;
+ case TINYINT:
+ byteCoder.encode(value.getByte(idx), outStream);
+ break;
+ case DOUBLE:
+ doubleCoder.encode(value.getDouble(idx), outStream);
+ break;
+ case FLOAT:
+ doubleCoder.encode((double) value.getFloat(idx), outStream);
+ break;
+ case DECIMAL:
+ bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
+ break;
+ case BIGINT:
+ longCoder.encode(value.getLong(idx), outStream);
+ break;
+ case VARCHAR:
+ case CHAR:
+ stringCoder.encode(value.getString(idx), outStream);
+ break;
+ case TIME:
+ longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
+ break;
+ case DATE:
+ case TIMESTAMP:
+ longCoder.encode(value.getDate(idx).getTime(), outStream);
+ break;
+ case BOOLEAN:
+ byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!");
+ }
+ }
+
+ instantCoder.encode(value.getWindowStart(), outStream);
+ instantCoder.encode(value.getWindowEnd(), outStream);
+ }
+
+ @Override
+ public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
+ List<Integer> nullFields = listCoder.decode(inStream);
+
+ BeamSqlRow record = new BeamSqlRow(tableSchema);
+ record.setNullFields(nullFields);
+ for (int idx = 0; idx < tableSchema.size(); ++idx) {
+ if (nullFields.contains(idx)) {
+ continue;
+ }
+
+ switch (CalciteUtils.getFieldType(tableSchema, idx)) {
+ case INTEGER:
+ record.addField(idx, intCoder.decode(inStream));
+ break;
+ case SMALLINT:
+ record.addField(idx, intCoder.decode(inStream).shortValue());
+ break;
+ case TINYINT:
+ record.addField(idx, byteCoder.decode(inStream));
+ break;
+ case DOUBLE:
+ record.addField(idx, doubleCoder.decode(inStream));
+ break;
+ case FLOAT:
+ record.addField(idx, doubleCoder.decode(inStream).floatValue());
+ break;
+ case BIGINT:
+ record.addField(idx, longCoder.decode(inStream));
+ break;
+ case DECIMAL:
+ record.addField(idx, bigDecimalCoder.decode(inStream));
+ break;
+ case VARCHAR:
+ case CHAR:
+ record.addField(idx, stringCoder.decode(inStream));
+ break;
+ case TIME:
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date(longCoder.decode(inStream)));
+ record.addField(idx, calendar);
+ break;
+ case DATE:
+ case TIMESTAMP:
+ record.addField(idx, new Date(longCoder.decode(inStream)));
+ break;
+ case BOOLEAN:
+ record.addField(idx, byteCoder.decode(inStream) == 1);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Data type: "
+ + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx))
+ + " not supported yet!");
+ }
+ }
+
+ record.setWindowStart(instantCoder.decode(inStream));
+ record.setWindowEnd(instantCoder.decode(inStream));
+
+ return record;
+ }
+
+ public BeamSqlRowType getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public void verifyDeterministic()
+ throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
new file mode 100644
index 0000000..1129bdd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Field type information in {@link BeamSqlRow}.
+ *
+ */
+@AutoValue
+public abstract class BeamSqlRowType implements Serializable {
+ public abstract List<String> getFieldsName();
+ public abstract List<Integer> getFieldsType();
+
+ public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) {
+ return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
+ }
+
+ public int size() {
+ return getFieldsName().size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
new file mode 100644
index 0000000..d419473
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * This interface defines a Beam Sql Table.
+ */
+public interface BeamSqlTable {
+ /**
+ * In Beam SQL, there's no difference between a batch query and a streaming
+ * query. {@link BeamIOType} is used to validate the sources.
+ */
+ BeamIOType getSourceType();
+
+ /**
+ * create a {@code PCollection<BeamSqlRow>} from source.
+ *
+ */
+ PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
+
+ /**
+ * create a {@code IO.write()} instance to write to target.
+ *
+ */
+ PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
+
+ /**
+ * Get the schema info of the table.
+ */
+ BeamSqlRowType getRowType();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java
new file mode 100644
index 0000000..9582ffa
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * abstract class of aggregation functions in Beam SQL.
+ *
+ * <p>There're several constrains for a UDAF:<br>
+ * 1. A constructor with an empty argument list is required;<br>
+ * 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double
+ * /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT
+ * /TIMESTAMP/DECIMAL;<br>
+ * 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br>
+ */
+public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable {
+ public BeamSqlUdaf(){}
+
+ /**
+ * create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}.
+ */
+ public abstract AccumT init();
+
+ /**
+ * add an input value, equals to {@link CombineFn#addInput(Object, Object)}.
+ */
+ public abstract AccumT add(AccumT accumulator, InputT input);
+
+ /**
+ * merge aggregation objects from parallel tasks, equals to
+ * {@link CombineFn#mergeAccumulators(Iterable)}.
+ */
+ public abstract AccumT merge(Iterable<AccumT> accumulators);
+
+ /**
+ * extract output value from aggregation object, equals to
+ * {@link CombineFn#extractOutput(Object)}.
+ */
+ public abstract OutputT result(AccumT accumulator);
+
+ /**
+ * get the coder for AccumT which stores the intermediate result.
+ * By default it's fetched from {@link CoderRegistry}.
+ */
+ public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry)
+ throws CannotProvideCoderException {
+ return registry.getCoder(
+ (Class<AccumT>) ((ParameterizedType) getClass()
+ .getGenericSuperclass()).getActualTypeArguments()[1]);
+ }
+}