You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:40 UTC
[17/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
deleted file mode 100644
index d4c98a3..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
- */
-public interface BeamRelNode extends RelNode {
-
- /**
- * A {@link BeamRelNode} is a recursive structure, the
- * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
- * algorithm.
- */
- PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
- throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
deleted file mode 100644
index 939c9c8..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
- * and {@code BeamMinusRel}.
- */
-public class BeamSetOperatorRelBase {
- /**
- * Set operator type.
- */
- public enum OpType implements Serializable {
- UNION,
- INTERSECT,
- MINUS
- }
-
- private BeamRelNode beamRelNode;
- private List<RelNode> inputs;
- private boolean all;
- private OpType opType;
-
- public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
- List<RelNode> inputs, boolean all) {
- this.beamRelNode = beamRelNode;
- this.opType = opType;
- this.inputs = inputs;
- this.all = all;
- }
-
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
- .buildBeamPipeline(inputPCollections, sqlEnv);
- PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
- .buildBeamPipeline(inputPCollections, sqlEnv);
-
- WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
- WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
- if (!leftWindow.isCompatible(rightWindow)) {
- throw new IllegalArgumentException(
- "inputs of " + opType + " have different window strategy: "
- + leftWindow + " VS " + rightWindow);
- }
-
- final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
- final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
-
- // co-group
- String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
- PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
- .of(leftTag, leftRows.apply(
- stageName + "_CreateLeftIndex", MapElements.via(
- new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
- .and(rightTag, rightRows.apply(
- stageName + "_CreateRightIndex", MapElements.via(
- new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
- .apply(CoGroupByKey.<BeamSqlRow>create());
- PCollection<BeamSqlRow> ret = coGbkResultCollection
- .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
- opType, all)));
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
deleted file mode 100644
index ba344df..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Top;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollationImpl;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamRelNode} to replace a {@code Sort} node.
- *
- * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement
- * the {@code Sort} algebra. The following types of ORDER BY are supported:
-
- * <pre>{@code
- * select * from t order by id desc limit 10;
- * select * from t order by id desc limit 10, 5;
- * }</pre>
- *
- * <p>but Order BY without a limit is NOT supported:
- *
- * <pre>{@code
- * select * from t order by id desc
- * }</pre>
- *
- * <h3>Constraints</h3>
- * <ul>
- * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT`
- * must fit into the memory of a single machine.</li>
- * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`,
- * it does not make much sense to use `ORDER BY` with `WINDOW`.
- * </li>
- * </ul>
- */
-public class BeamSortRel extends Sort implements BeamRelNode {
- private List<Integer> fieldIndices = new ArrayList<>();
- private List<Boolean> orientation = new ArrayList<>();
- private List<Boolean> nullsFirst = new ArrayList<>();
-
- private int startIndex = 0;
- private int count;
-
- public BeamSortRel(
- RelOptCluster cluster,
- RelTraitSet traits,
- RelNode child,
- RelCollation collation,
- RexNode offset,
- RexNode fetch) {
- super(cluster, traits, child, collation, offset, fetch);
-
- List<RexNode> fieldExps = getChildExps();
- RelCollationImpl collationImpl = (RelCollationImpl) collation;
- List<RelFieldCollation> collations = collationImpl.getFieldCollations();
- for (int i = 0; i < fieldExps.size(); i++) {
- RexNode fieldExp = fieldExps.get(i);
- RexInputRef inputRef = (RexInputRef) fieldExp;
- fieldIndices.add(inputRef.getIndex());
- orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
-
- RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
- if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
- rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
- }
- nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
- }
-
- if (fetch == null) {
- throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
- }
-
- RexLiteral fetchLiteral = (RexLiteral) fetch;
- count = ((BigDecimal) fetchLiteral.getValue()).intValue();
-
- if (offset != null) {
- RexLiteral offsetLiteral = (RexLiteral) offset;
- startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
- }
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
- .buildBeamPipeline(inputPCollections, sqlEnv);
- Type windowType = upstream.getWindowingStrategy().getWindowFn()
- .getWindowTypeDescriptor().getType();
- if (!windowType.equals(GlobalWindow.class)) {
- throw new UnsupportedOperationException(
- "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType);
- }
-
- BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
- nullsFirst);
- // first find the top (offset + count)
- PCollection<List<BeamSqlRow>> rawStream =
- upstream.apply("extractTopOffsetAndFetch",
- Top.of(startIndex + count, comparator).withoutDefaults())
- .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
-
- // strip the `leading offset`
- if (startIndex > 0) {
- rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
- new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
- .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
- }
-
- PCollection<BeamSqlRow> orderedStream = rawStream.apply(
- "flatten", Flatten.<BeamSqlRow>iterables());
- orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return orderedStream;
- }
-
- private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
- private int startIndex;
- private int endIndex;
-
- public SubListFn(int startIndex, int endIndex) {
- this.startIndex = startIndex;
- this.endIndex = endIndex;
- }
-
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().subList(startIndex, endIndex));
- }
- }
-
- @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
- RexNode offset, RexNode fetch) {
- return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
- }
-
- private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
- private List<Integer> fieldsIndices;
- private List<Boolean> orientation;
- private List<Boolean> nullsFirst;
-
- public BeamSqlRowComparator(List<Integer> fieldsIndices,
- List<Boolean> orientation,
- List<Boolean> nullsFirst) {
- this.fieldsIndices = fieldsIndices;
- this.orientation = orientation;
- this.nullsFirst = nullsFirst;
- }
-
- @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
- for (int i = 0; i < fieldsIndices.size(); i++) {
- int fieldIndex = fieldsIndices.get(i);
- int fieldRet = 0;
- SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
- // whether NULL should be ordered first or last(compared to non-null values) depends on
- // what user specified in SQL(NULLS FIRST/NULLS LAST)
- if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
- continue;
- } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) {
- fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1);
- } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
- fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1);
- } else {
- switch (fieldType) {
- case TINYINT:
- fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
- break;
- case SMALLINT:
- fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
- break;
- case INTEGER:
- fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
- break;
- case BIGINT:
- fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
- break;
- case FLOAT:
- fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
- break;
- case DOUBLE:
- fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
- break;
- case VARCHAR:
- fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
- break;
- case DATE:
- fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
- break;
- default:
- throw new UnsupportedOperationException(
- "Data type: " + fieldType + " not supported yet!");
- }
- }
-
- fieldRet *= (orientation.get(i) ? -1 : 1);
- if (fieldRet != 0) {
- return fieldRet;
- }
- }
- return 0;
- }
- }
-
- public static <T extends Number & Comparable> int numberCompare(T a, T b) {
- return a.compareTo(b);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
deleted file mode 100644
index 9f1f703..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for {@code BeamRelNode}.
- */
-class BeamSqlRelUtils {
- private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
-
- private static final AtomicInteger sequence = new AtomicInteger(0);
- private static final AtomicInteger classSequence = new AtomicInteger(0);
-
- public static String getStageName(BeamRelNode relNode) {
- return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
- + sequence.getAndIncrement();
- }
-
- public static String getClassName(BeamRelNode relNode) {
- return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
- + "_" + classSequence.getAndIncrement();
- }
-
- public static BeamRelNode getBeamRelInput(RelNode input) {
- if (input instanceof RelSubset) {
- // go with known best input
- input = ((RelSubset) input).getBest();
- }
- return (BeamRelNode) input;
- }
-
- public static String explain(final RelNode rel) {
- return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
- }
-
- public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
- String explain = "";
- try {
- explain = RelOptUtil.toString(rel);
- } catch (StackOverflowError e) {
- LOG.error("StackOverflowError occurred while extracting plan. "
- + "Please report it to the dev@ mailing list.");
- LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
- LOG.error("Forcing plan to empty string and continue... "
- + "SQL Runner may not working properly after.");
- }
- return explain;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
deleted file mode 100644
index c661585..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.core.Union;
-
-/**
- * {@link BeamRelNode} to replace a {@link Union}.
- *
- * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
- * perspective, two cases are supported:
- *
- * <p>1) Do not use {@code grouped window function}:
- *
- * <pre>{@code
- * select * from person UNION select * from person
- * }</pre>
- *
- * <p>2) Use the same {@code grouped window function}, with the same param:
- * <pre>{@code
- * select id, count(*) from person
- * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * UNION
- * select * from person
- * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * }</pre>
- *
- * <p>Inputs with different group functions are NOT supported:
- * <pre>{@code
- * select id, count(*) from person
- * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * UNION
- * select * from person
- * group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
- * }</pre>
- */
-public class BeamUnionRel extends Union implements BeamRelNode {
- private BeamSetOperatorRelBase delegate;
- public BeamUnionRel(RelOptCluster cluster,
- RelTraitSet traits,
- List<RelNode> inputs,
- boolean all) {
- super(cluster, traits, inputs, all);
- this.delegate = new BeamSetOperatorRelBase(this,
- BeamSetOperatorRelBase.OpType.UNION,
- inputs, all);
- }
-
- public BeamUnionRel(RelInput input) {
- super(input);
- }
-
- @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new BeamUnionRel(getCluster(), traitSet, inputs, all);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
deleted file mode 100644
index 43b74c3..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.schema.BeamTableUtils;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLiteral;
-
-/**
- * {@code BeamRelNode} to replace a {@code Values} node.
- *
- * <p>{@code BeamValuesRel} will be used in the following SQLs:
- * <ul>
- * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li>
- * <li>{@code select 1, '1', LOCALTIME}</li>
- * </ul>
- */
-public class BeamValuesRel extends Values implements BeamRelNode {
-
- public BeamValuesRel(
- RelOptCluster cluster,
- RelDataType rowType,
- ImmutableList<ImmutableList<RexLiteral>> tuples,
- RelTraitSet traits) {
- super(cluster, rowType, tuples, traits);
-
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
- String stageName = BeamSqlRelUtils.getStageName(this);
- if (tuples.isEmpty()) {
- throw new IllegalStateException("Values with empty tuples!");
- }
-
- BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
- for (ImmutableList<RexLiteral> tuple : tuples) {
- BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
- for (int i = 0; i < tuple.size(); i++) {
- BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
- }
- rows.add(row);
- }
-
- return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
- .setCoder(new BeamSqlRowCoder(beamSQLRowType));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
deleted file mode 100644
index 77d6204..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
- *
- */
-package org.apache.beam.dsls.sql.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
deleted file mode 100644
index 6e843d4..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.dsls.sql.rel.BeamAggregationRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.joda.time.Duration;
-
-/**
- * Rule to detect the window/trigger settings.
- *
- */
-public class BeamAggregationRule extends RelOptRule {
- public static final BeamAggregationRule INSTANCE =
- new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
-
- public BeamAggregationRule(
- Class<? extends Aggregate> aggregateClass,
- Class<? extends Project> projectClass,
- RelBuilderFactory relBuilderFactory) {
- super(
- operand(aggregateClass,
- operand(projectClass, any())),
- relBuilderFactory, null);
- }
-
- public BeamAggregationRule(RelOptRuleOperand operand, String description) {
- super(operand, description);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final Aggregate aggregate = call.rel(0);
- final Project project = call.rel(1);
- updateWindowTrigger(call, aggregate, project);
- }
-
- private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
- Project project) {
- ImmutableBitSet groupByFields = aggregate.getGroupSet();
- List<RexNode> projectMapping = project.getProjects();
-
- WindowFn windowFn = new GlobalWindows();
- Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
- int windowFieldIdx = -1;
- Duration allowedLatence = Duration.ZERO;
-
- for (int groupField : groupByFields.asList()) {
- RexNode projNode = projectMapping.get(groupField);
- if (projNode instanceof RexCall) {
- SqlOperator op = ((RexCall) projNode).op;
- ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
- String functionName = op.getName();
- switch (functionName) {
- case "TUMBLE":
- windowFieldIdx = groupField;
- windowFn = FixedWindows
- .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
- if (parameters.size() == 3) {
- GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
- .getValue();
- triggerFn = createTriggerWithDelay(delayTime);
- allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
- }
- break;
- case "HOP":
- windowFieldIdx = groupField;
- windowFn = SlidingWindows
- .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
- .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
- if (parameters.size() == 4) {
- GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
- .getValue();
- triggerFn = createTriggerWithDelay(delayTime);
- allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
- }
- break;
- case "SESSION":
- windowFieldIdx = groupField;
- windowFn = Sessions
- .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
- if (parameters.size() == 3) {
- GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
- .getValue();
- triggerFn = createTriggerWithDelay(delayTime);
- allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
- }
- break;
- default:
- break;
- }
- }
- }
-
- BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
- aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(aggregate.getInput(),
- aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- aggregate.indicator,
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
- aggregate.getAggCallList(),
- windowFn,
- triggerFn,
- windowFieldIdx,
- allowedLatence);
- call.transformTo(newAggregator);
- }
-
- private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
- return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
- .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
- }
-
- private long getWindowParameterAsMillis(RexNode parameterNode) {
- if (parameterNode instanceof RexLiteral) {
- return RexLiteral.intValue(parameterNode);
- } else {
- throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
deleted file mode 100644
index 414b666..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-
-/**
- * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
- *
- */
-public class BeamFilterRule extends ConverterRule {
- public static final BeamFilterRule INSTANCE = new BeamFilterRule();
-
- private BeamFilterRule() {
- super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Filter filter = (Filter) rel;
- final RelNode input = filter.getInput();
-
- return new BeamFilterRel(filter.getCluster(),
- filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- filter.getCondition());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
deleted file mode 100644
index 4cc4ef5..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.rel.BeamIOSinkRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
-public class BeamIOSinkRule extends ConverterRule {
- public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
-
- private BeamIOSinkRule() {
- super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
- "BeamIOSinkRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableModify tableModify = (TableModify) rel;
- final RelNode input = tableModify.getInput();
-
- final RelOptCluster cluster = tableModify.getCluster();
- final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
- final RelOptTable relOptTable = tableModify.getTable();
- final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
- final RelNode convertedInput = convert(input,
- input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
- final TableModify.Operation operation = tableModify.getOperation();
- final List<String> updateColumnList = tableModify.getUpdateColumnList();
- final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
- final boolean flattened = tableModify.isFlattened();
-
- final Table table = tableModify.getTable().unwrap(Table.class);
-
- switch (table.getJdbcTableType()) {
- case TABLE:
- case STREAM:
- if (operation != TableModify.Operation.INSERT) {
- throw new UnsupportedOperationException(
- String.format("Streams doesn't support %s modify operation", operation));
- }
- return new BeamIOSinkRel(cluster, traitSet,
- relOptTable, catalogReader, convertedInput, operation, updateColumnList,
- sourceExpressionList, flattened);
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported table type: %s", table.getJdbcTableType()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
deleted file mode 100644
index 85a69ff..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamIOSourceRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
-public class BeamIOSourceRule extends ConverterRule {
- public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
-
- private BeamIOSourceRule() {
- super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
- "BeamIOSourceRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableScan scan = (TableScan) rel;
-
- return new BeamIOSourceRel(scan.getCluster(),
- scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
deleted file mode 100644
index 70716c5..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.rel.BeamIntersectRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Intersect;
-import org.apache.calcite.rel.logical.LogicalIntersect;
-
-/**
- * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
- */
-public class BeamIntersectRule extends ConverterRule {
- public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
- private BeamIntersectRule() {
- super(LogicalIntersect.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Intersect intersect = (Intersect) rel;
- final List<RelNode> inputs = intersect.getInputs();
- return new BeamIntersectRel(
- intersect.getCluster(),
- intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convertList(inputs, BeamLogicalConvention.INSTANCE),
- intersect.all
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
deleted file mode 100644
index 78253fe..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamJoinRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.logical.LogicalJoin;
-
-/**
- * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
- */
-public class BeamJoinRule extends ConverterRule {
- public static final BeamJoinRule INSTANCE = new BeamJoinRule();
- private BeamJoinRule() {
- super(LogicalJoin.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamJoinRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Join join = (Join) rel;
- return new BeamJoinRel(
- join.getCluster(),
- join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(join.getLeft(),
- join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- convert(join.getRight(),
- join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- join.getCondition(),
- join.getVariablesSet(),
- join.getJoinType()
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
deleted file mode 100644
index ca93c71..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamMinusRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Minus;
-import org.apache.calcite.rel.logical.LogicalMinus;
-
-/**
- * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
- */
-public class BeamMinusRule extends ConverterRule {
- public static final BeamMinusRule INSTANCE = new BeamMinusRule();
- private BeamMinusRule() {
- super(LogicalMinus.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamMinusRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Minus minus = (Minus) rel;
- final List<RelNode> inputs = minus.getInputs();
- return new BeamMinusRel(
- minus.getCluster(),
- minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convertList(inputs, BeamLogicalConvention.INSTANCE),
- minus.all
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
deleted file mode 100644
index 6dc3b57..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-
-/**
- * A {@code ConverterRule} to replace {@link Project} with
- * {@link BeamProjectRel}.
- *
- */
-public class BeamProjectRule extends ConverterRule {
- public static final BeamProjectRule INSTANCE = new BeamProjectRule();
-
- private BeamProjectRule() {
- super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Project project = (Project) rel;
- final RelNode input = project.getInput();
-
- return new BeamProjectRel(project.getCluster(),
- project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- project.getProjects(), project.getRowType());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
deleted file mode 100644
index d802e9d..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-
-import org.apache.beam.dsls.sql.rel.BeamSortRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalSort;
-
-/**
- * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}.
- */
-public class BeamSortRule extends ConverterRule {
- public static final BeamSortRule INSTANCE = new BeamSortRule();
- private BeamSortRule() {
- super(LogicalSort.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamSortRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Sort sort = (Sort) rel;
- final RelNode input = sort.getInput();
- return new BeamSortRel(
- sort.getCluster(),
- sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- sort.getCollation(),
- sort.offset,
- sort.fetch
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
deleted file mode 100644
index b8430b9..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamUnionRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Union;
-import org.apache.calcite.rel.logical.LogicalUnion;
-
-/**
- * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
- * {@link BeamUnionRule}.
- */
-public class BeamUnionRule extends ConverterRule {
- public static final BeamUnionRule INSTANCE = new BeamUnionRule();
- private BeamUnionRule() {
- super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
- "BeamUnionRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Union union = (Union) rel;
-
- return new BeamUnionRel(
- union.getCluster(),
- union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
- union.all
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
deleted file mode 100644
index 4ea9e60..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamValuesRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.logical.LogicalValues;
-
-/**
- * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}.
- */
-public class BeamValuesRule extends ConverterRule {
- public static final BeamValuesRule INSTANCE = new BeamValuesRule();
- private BeamValuesRule() {
- super(LogicalValues.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamValuesRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Values values = (Values) rel;
- return new BeamValuesRel(
- values.getCluster(),
- values.getRowType(),
- values.getTuples(),
- values.getTraitSet().replace(BeamLogicalConvention.INSTANCE)
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
deleted file mode 100644
index 5d32647..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.calcite.plan.RelOptRule} to generate
- * {@link org.apache.beam.dsls.sql.rel.BeamRelNode}.
- */
-package org.apache.beam.dsls.sql.rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
deleted file mode 100644
index dfa2785..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
- */
-public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
- protected BeamSqlRowType beamSqlRowType;
- public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
- this.beamSqlRowType = beamSqlRowType;
- }
-
- @Override public BeamSqlRowType getRowType() {
- return beamSqlRowType;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
deleted file mode 100644
index 502e8c1..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Type as a source IO, determined whether it's a STREAMING process, or batch
- * process.
- */
-public enum BeamIOType implements Serializable {
- BOUNDED, UNBOUNDED;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
deleted file mode 100644
index 5b63780..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
- * then a downstream query can query directly.
- */
-public class BeamPCollectionTable extends BaseBeamTable {
- private BeamIOType ioType;
- private transient PCollection<BeamSqlRow> upstream;
-
- protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
- super(beamSqlRowType);
- }
-
- public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
- BeamSqlRowType beamSqlRowType){
- this(beamSqlRowType);
- ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
- ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
- this.upstream = upstream;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return ioType;
- }
-
- @Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- return upstream;
- }
-
- @Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
deleted file mode 100644
index d789446..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.Instant;
-
-/**
- * Represent a generic ROW record in Beam SQL.
- *
- */
-public class BeamSqlRow implements Serializable {
- private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
- static {
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
- }
-
- private List<Integer> nullFields = new ArrayList<>();
- private List<Object> dataValues;
- private BeamSqlRowType dataType;
-
- private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
- private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
-
- public BeamSqlRow(BeamSqlRowType dataType) {
- this.dataType = dataType;
- this.dataValues = new ArrayList<>();
- for (int idx = 0; idx < dataType.size(); ++idx) {
- dataValues.add(null);
- nullFields.add(idx);
- }
- }
-
- public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
- this(dataType);
- for (int idx = 0; idx < dataValues.size(); ++idx) {
- addField(idx, dataValues.get(idx));
- }
- }
-
- public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){
- windowStart = upstreamRecord.windowStart;
- windowEnd = upstreamRecord.windowEnd;
-
- if (window instanceof IntervalWindow) {
- IntervalWindow iWindow = (IntervalWindow) window;
- windowStart = iWindow.start();
- windowEnd = iWindow.end();
- }
- }
-
- public void addField(String fieldName, Object fieldValue) {
- addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
- }
-
- public void addField(int index, Object fieldValue) {
- if (fieldValue == null) {
- return;
- } else {
- if (nullFields.contains(index)) {
- nullFields.remove(nullFields.indexOf(index));
- }
- }
-
- validateValueType(index, fieldValue);
- dataValues.set(index, fieldValue);
- }
-
- private void validateValueType(int index, Object fieldValue) {
- SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
- Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
- if (javaClazz == null) {
- throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
- }
-
- if (!fieldValue.getClass().equals(javaClazz)) {
- throw new IllegalArgumentException(
- String.format("[%s](%s) doesn't match type [%s]",
- fieldValue, fieldValue.getClass(), fieldType)
- );
- }
- }
-
- public Object getFieldValue(String fieldName) {
- return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
- }
-
- public byte getByte(String fieldName) {
- return (Byte) getFieldValue(fieldName);
- }
-
- public short getShort(String fieldName) {
- return (Short) getFieldValue(fieldName);
- }
-
- public int getInteger(String fieldName) {
- return (Integer) getFieldValue(fieldName);
- }
-
- public float getFloat(String fieldName) {
- return (Float) getFieldValue(fieldName);
- }
-
- public double getDouble(String fieldName) {
- return (Double) getFieldValue(fieldName);
- }
-
- public long getLong(String fieldName) {
- return (Long) getFieldValue(fieldName);
- }
-
- public String getString(String fieldName) {
- return (String) getFieldValue(fieldName);
- }
-
- public Date getDate(String fieldName) {
- return (Date) getFieldValue(fieldName);
- }
-
- public GregorianCalendar getGregorianCalendar(String fieldName) {
- return (GregorianCalendar) getFieldValue(fieldName);
- }
-
- public BigDecimal getBigDecimal(String fieldName) {
- return (BigDecimal) getFieldValue(fieldName);
- }
-
- public boolean getBoolean(String fieldName) {
- return (boolean) getFieldValue(fieldName);
- }
-
- public Object getFieldValue(int fieldIdx) {
- if (nullFields.contains(fieldIdx)) {
- return null;
- }
-
- return dataValues.get(fieldIdx);
- }
-
- public byte getByte(int idx) {
- return (Byte) getFieldValue(idx);
- }
-
- public short getShort(int idx) {
- return (Short) getFieldValue(idx);
- }
-
- public int getInteger(int idx) {
- return (Integer) getFieldValue(idx);
- }
-
- public float getFloat(int idx) {
- return (Float) getFieldValue(idx);
- }
-
- public double getDouble(int idx) {
- return (Double) getFieldValue(idx);
- }
-
- public long getLong(int idx) {
- return (Long) getFieldValue(idx);
- }
-
- public String getString(int idx) {
- return (String) getFieldValue(idx);
- }
-
- public Date getDate(int idx) {
- return (Date) getFieldValue(idx);
- }
-
- public GregorianCalendar getGregorianCalendar(int idx) {
- return (GregorianCalendar) getFieldValue(idx);
- }
-
- public BigDecimal getBigDecimal(int idx) {
- return (BigDecimal) getFieldValue(idx);
- }
-
- public boolean getBoolean(int idx) {
- return (boolean) getFieldValue(idx);
- }
-
- public int size() {
- return dataValues.size();
- }
-
- public List<Object> getDataValues() {
- return dataValues;
- }
-
- public void setDataValues(List<Object> dataValues) {
- this.dataValues = dataValues;
- }
-
- public BeamSqlRowType getDataType() {
- return dataType;
- }
-
- public void setDataType(BeamSqlRowType dataType) {
- this.dataType = dataType;
- }
-
- public void setNullFields(List<Integer> nullFields) {
- this.nullFields = nullFields;
- }
-
- public List<Integer> getNullFields() {
- return nullFields;
- }
-
- /**
- * is the specified field NULL?
- */
- public boolean isNull(int idx) {
- return nullFields.contains(idx);
- }
-
- public Instant getWindowStart() {
- return windowStart;
- }
-
- public Instant getWindowEnd() {
- return windowEnd;
- }
-
- public void setWindowStart(Instant windowStart) {
- this.windowStart = windowStart;
- }
-
- public void setWindowEnd(Instant windowEnd) {
- this.windowEnd = windowEnd;
- }
-
- @Override
- public String toString() {
- return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
- + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
- }
-
- /**
- * Return data fields as key=value.
- */
- public String valueInString() {
- StringBuilder sb = new StringBuilder();
- for (int idx = 0; idx < size(); ++idx) {
- sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
- }
- return sb.substring(1);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- BeamSqlRow other = (BeamSqlRow) obj;
- return toString().equals(other.toString());
- }
-
- @Override public int hashCode() {
- return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
deleted file mode 100644
index f14864a..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- * A {@link Coder} encodes {@link BeamSqlRow}.
- */
-public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
- private BeamSqlRowType tableSchema;
-
- private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
-
- private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
- private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
- private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
- private static final DoubleCoder doubleCoder = DoubleCoder.of();
- private static final InstantCoder instantCoder = InstantCoder.of();
- private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
- private static final ByteCoder byteCoder = ByteCoder.of();
-
- public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
- this.tableSchema = tableSchema;
- }
-
- @Override
- public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException {
- listCoder.encode(value.getNullFields(), outStream);
- for (int idx = 0; idx < value.size(); ++idx) {
- if (value.getNullFields().contains(idx)) {
- continue;
- }
-
- switch (CalciteUtils.getFieldType(value.getDataType(), idx)) {
- case INTEGER:
- intCoder.encode(value.getInteger(idx), outStream);
- break;
- case SMALLINT:
- intCoder.encode((int) value.getShort(idx), outStream);
- break;
- case TINYINT:
- byteCoder.encode(value.getByte(idx), outStream);
- break;
- case DOUBLE:
- doubleCoder.encode(value.getDouble(idx), outStream);
- break;
- case FLOAT:
- doubleCoder.encode((double) value.getFloat(idx), outStream);
- break;
- case DECIMAL:
- bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
- break;
- case BIGINT:
- longCoder.encode(value.getLong(idx), outStream);
- break;
- case VARCHAR:
- case CHAR:
- stringCoder.encode(value.getString(idx), outStream);
- break;
- case TIME:
- longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
- break;
- case DATE:
- case TIMESTAMP:
- longCoder.encode(value.getDate(idx).getTime(), outStream);
- break;
- case BOOLEAN:
- byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!");
- }
- }
-
- instantCoder.encode(value.getWindowStart(), outStream);
- instantCoder.encode(value.getWindowEnd(), outStream);
- }
-
- @Override
- public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
- List<Integer> nullFields = listCoder.decode(inStream);
-
- BeamSqlRow record = new BeamSqlRow(tableSchema);
- record.setNullFields(nullFields);
- for (int idx = 0; idx < tableSchema.size(); ++idx) {
- if (nullFields.contains(idx)) {
- continue;
- }
-
- switch (CalciteUtils.getFieldType(tableSchema, idx)) {
- case INTEGER:
- record.addField(idx, intCoder.decode(inStream));
- break;
- case SMALLINT:
- record.addField(idx, intCoder.decode(inStream).shortValue());
- break;
- case TINYINT:
- record.addField(idx, byteCoder.decode(inStream));
- break;
- case DOUBLE:
- record.addField(idx, doubleCoder.decode(inStream));
- break;
- case FLOAT:
- record.addField(idx, doubleCoder.decode(inStream).floatValue());
- break;
- case BIGINT:
- record.addField(idx, longCoder.decode(inStream));
- break;
- case DECIMAL:
- record.addField(idx, bigDecimalCoder.decode(inStream));
- break;
- case VARCHAR:
- case CHAR:
- record.addField(idx, stringCoder.decode(inStream));
- break;
- case TIME:
- GregorianCalendar calendar = new GregorianCalendar();
- calendar.setTime(new Date(longCoder.decode(inStream)));
- record.addField(idx, calendar);
- break;
- case DATE:
- case TIMESTAMP:
- record.addField(idx, new Date(longCoder.decode(inStream)));
- break;
- case BOOLEAN:
- record.addField(idx, byteCoder.decode(inStream) == 1);
- break;
-
- default:
- throw new UnsupportedOperationException("Data type: "
- + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx))
- + " not supported yet!");
- }
- }
-
- record.setWindowStart(instantCoder.decode(inStream));
- record.setWindowEnd(instantCoder.decode(inStream));
-
- return record;
- }
-
- public BeamSqlRowType getTableSchema() {
- return tableSchema;
- }
-
- @Override
- public void verifyDeterministic()
- throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
deleted file mode 100644
index 1129bdd..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Field type information in {@link BeamSqlRow}.
- *
- */
-@AutoValue
-public abstract class BeamSqlRowType implements Serializable {
- public abstract List<String> getFieldsName();
- public abstract List<Integer> getFieldsType();
-
- public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) {
- return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
- }
-
- public int size() {
- return getFieldsName().size();
- }
-}