You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:55 UTC
[32/59] beam git commit: rename package org.apache.beam.dsls.sql to
org.apache.beam.sdk.extensions.sql
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
new file mode 100644
index 0000000..fb0a8e2
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
+ *
+ */
+package org.apache.beam.sdk.extensions.sql.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
new file mode 100644
index 0000000..17e3f80
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamAggregationRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Duration;
+
+/**
+ * Rule to detect the window/trigger settings.
+ *
+ */
+public class BeamAggregationRule extends RelOptRule {
+ public static final BeamAggregationRule INSTANCE =
+ new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+ public BeamAggregationRule(
+ Class<? extends Aggregate> aggregateClass,
+ Class<? extends Project> projectClass,
+ RelBuilderFactory relBuilderFactory) {
+ super(
+ operand(aggregateClass,
+ operand(projectClass, any())),
+ relBuilderFactory, null);
+ }
+
+ public BeamAggregationRule(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Aggregate aggregate = call.rel(0);
+ final Project project = call.rel(1);
+ updateWindowTrigger(call, aggregate, project);
+ }
+
+ private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
+ Project project) {
+ ImmutableBitSet groupByFields = aggregate.getGroupSet();
+ List<RexNode> projectMapping = project.getProjects();
+
+ WindowFn windowFn = new GlobalWindows();
+ Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
+ int windowFieldIdx = -1;
+ Duration allowedLatence = Duration.ZERO;
+
+ for (int groupField : groupByFields.asList()) {
+ RexNode projNode = projectMapping.get(groupField);
+ if (projNode instanceof RexCall) {
+ SqlOperator op = ((RexCall) projNode).op;
+ ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+ String functionName = op.getName();
+ switch (functionName) {
+ case "TUMBLE":
+ windowFieldIdx = groupField;
+ windowFn = FixedWindows
+ .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "HOP":
+ windowFieldIdx = groupField;
+ windowFn = SlidingWindows
+ .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
+ .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
+ if (parameters.size() == 4) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ case "SESSION":
+ windowFieldIdx = groupField;
+ windowFn = Sessions
+ .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+ if (parameters.size() == 3) {
+ GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+ .getValue();
+ triggerFn = createTriggerWithDelay(delayTime);
+ allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
+ aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(aggregate.getInput(),
+ aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList(),
+ windowFn,
+ triggerFn,
+ windowFieldIdx,
+ allowedLatence);
+ call.transformTo(newAggregator);
+ }
+
+ private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+ return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
+ .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
+ }
+
+ private long getWindowParameterAsMillis(RexNode parameterNode) {
+ if (parameterNode instanceof RexLiteral) {
+ return RexLiteral.intValue(parameterNode);
+ } else {
+ throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
new file mode 100644
index 0000000..b30a9d9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+
+/**
+ * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
+ *
+ */
+public class BeamFilterRule extends ConverterRule {
+ public static final BeamFilterRule INSTANCE = new BeamFilterRule();
+
+ private BeamFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ return new BeamFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
new file mode 100644
index 0000000..54079b0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamIOSinkRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableModify} with
+ * {@link BeamIOSinkRel}.
+ *
+ */
+public class BeamIOSinkRule extends ConverterRule {
+ public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
+
+ private BeamIOSinkRule() {
+ super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSinkRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input,
+ input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List<String> updateColumnList = tableModify.getUpdateColumnList();
+ final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case TABLE:
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(
+ String.format("Streams doesn't support %s modify operation", operation));
+ }
+ return new BeamIOSinkRel(cluster, traitSet,
+ relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
new file mode 100644
index 0000000..496b977
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableScan} with
+ * {@link BeamIOSourceRel}.
+ *
+ */
+public class BeamIOSourceRule extends ConverterRule {
+ public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
+
+ private BeamIOSourceRule() {
+ super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSourceRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+
+ return new BeamIOSourceRel(scan.getCluster(),
+ scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
new file mode 100644
index 0000000..6fdbd9b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamIntersectRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+
+/**
+ * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRule extends ConverterRule {
+ public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
+ private BeamIntersectRule() {
+ super(LogicalIntersect.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Intersect intersect = (Intersect) rel;
+ final List<RelNode> inputs = intersect.getInputs();
+ return new BeamIntersectRel(
+ intersect.getCluster(),
+ intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(inputs, BeamLogicalConvention.INSTANCE),
+ intersect.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
new file mode 100644
index 0000000..147932e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
+ */
+public class BeamJoinRule extends ConverterRule {
+ public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+ private BeamJoinRule() {
+ super(LogicalJoin.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamJoinRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Join join = (Join) rel;
+ return new BeamJoinRel(
+ join.getCluster(),
+ join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(join.getLeft(),
+ join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ convert(join.getRight(),
+ join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ join.getCondition(),
+ join.getVariablesSet(),
+ join.getJoinType()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
new file mode 100644
index 0000000..363cf3b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamMinusRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+
+/**
+ * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
+ */
+public class BeamMinusRule extends ConverterRule {
+ public static final BeamMinusRule INSTANCE = new BeamMinusRule();
+ private BeamMinusRule() {
+ super(LogicalMinus.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamMinusRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Minus minus = (Minus) rel;
+ final List<RelNode> inputs = minus.getInputs();
+ return new BeamMinusRel(
+ minus.getCluster(),
+ minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(inputs, BeamLogicalConvention.INSTANCE),
+ minus.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
new file mode 100644
index 0000000..4f2f8c9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+
+/**
+ * A {@code ConverterRule} to replace {@link Project} with
+ * {@link BeamProjectRel}.
+ *
+ */
+public class BeamProjectRule extends ConverterRule {
+ public static final BeamProjectRule INSTANCE = new BeamProjectRule();
+
+ private BeamProjectRule() {
+ super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new BeamProjectRel(project.getCluster(),
+ project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ project.getProjects(), project.getRowType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
new file mode 100644
index 0000000..e104d37
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamSortRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalSort;
+
+/**
+ * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}.
+ */
+public class BeamSortRule extends ConverterRule {
+ public static final BeamSortRule INSTANCE = new BeamSortRule();
+ private BeamSortRule() {
+ super(LogicalSort.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamSortRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Sort sort = (Sort) rel;
+ final RelNode input = sort.getInput();
+ return new BeamSortRel(
+ sort.getCluster(),
+ sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
new file mode 100644
index 0000000..975ccbc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamUnionRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+
+/**
+ * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
+ * {@link BeamUnionRule}.
+ */
+public class BeamUnionRule extends ConverterRule {
+ public static final BeamUnionRule INSTANCE = new BeamUnionRule();
+ private BeamUnionRule() {
+ super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamUnionRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Union union = (Union) rel;
+
+ return new BeamUnionRel(
+ union.getCluster(),
+ union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
+ union.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
new file mode 100644
index 0000000..86a8f72
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamValuesRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalValues;
+
+/**
+ * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}.
+ */
+public class BeamValuesRule extends ConverterRule {
+ public static final BeamValuesRule INSTANCE = new BeamValuesRule();
+ private BeamValuesRule() {
+ super(LogicalValues.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamValuesRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Values values = (Values) rel;
+ return new BeamValuesRel(
+ values.getCluster(),
+ values.getRowType(),
+ values.getTuples(),
+ values.getTraitSet().replace(BeamLogicalConvention.INSTANCE)
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
new file mode 100644
index 0000000..f57cdee
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.calcite.plan.RelOptRule} to generate
+ * {@link org.apache.beam.sdk.extensions.sql.rel.BeamRelNode}.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
new file mode 100644
index 0000000..bf41c95
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
+ protected BeamSqlRowType beamSqlRowType;
+ public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+ this.beamSqlRowType = beamSqlRowType;
+ }
+
+ @Override public BeamSqlRowType getRowType() {
+ return beamSqlRowType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
new file mode 100644
index 0000000..bda3ca1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+ BOUNDED, UNBOUNDED;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
new file mode 100644
index 0000000..5bbb8fd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
+ * then a downstream query can query directly.
+ */
+public class BeamPCollectionTable extends BaseBeamTable {
+ private BeamIOType ioType;
+ private transient PCollection<BeamSqlRow> upstream;
+
+ protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
+ }
+
+ public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
+ BeamSqlRowType beamSqlRowType){
+ this(beamSqlRowType);
+ ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
+ ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
+ this.upstream = upstream;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return ioType;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ return upstream;
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
new file mode 100644
index 0000000..616e9f3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Instant;
+
+/**
+ * Represent a generic ROW record in Beam SQL.
+ *
+ */
+public class BeamSqlRow implements Serializable {
+ private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+ static {
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+ }
+
+ private List<Integer> nullFields = new ArrayList<>();
+ private List<Object> dataValues;
+ private BeamSqlRowType dataType;
+
+ private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+ private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+
+ public BeamSqlRow(BeamSqlRowType dataType) {
+ this.dataType = dataType;
+ this.dataValues = new ArrayList<>();
+ for (int idx = 0; idx < dataType.size(); ++idx) {
+ dataValues.add(null);
+ nullFields.add(idx);
+ }
+ }
+
+ public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
+ this(dataType);
+ for (int idx = 0; idx < dataValues.size(); ++idx) {
+ addField(idx, dataValues.get(idx));
+ }
+ }
+
+ public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){
+ windowStart = upstreamRecord.windowStart;
+ windowEnd = upstreamRecord.windowEnd;
+
+ if (window instanceof IntervalWindow) {
+ IntervalWindow iWindow = (IntervalWindow) window;
+ windowStart = iWindow.start();
+ windowEnd = iWindow.end();
+ }
+ }
+
+ public void addField(String fieldName, Object fieldValue) {
+ addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+ }
+
+ public void addField(int index, Object fieldValue) {
+ if (fieldValue == null) {
+ return;
+ } else {
+ if (nullFields.contains(index)) {
+ nullFields.remove(nullFields.indexOf(index));
+ }
+ }
+
+ validateValueType(index, fieldValue);
+ dataValues.set(index, fieldValue);
+ }
+
+ private void validateValueType(int index, Object fieldValue) {
+ SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
+ Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
+ if (javaClazz == null) {
+ throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
+ }
+
+ if (!fieldValue.getClass().equals(javaClazz)) {
+ throw new IllegalArgumentException(
+ String.format("[%s](%s) doesn't match type [%s]",
+ fieldValue, fieldValue.getClass(), fieldType)
+ );
+ }
+ }
+
+ public Object getFieldValue(String fieldName) {
+ return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+ }
+
+ public byte getByte(String fieldName) {
+ return (Byte) getFieldValue(fieldName);
+ }
+
+ public short getShort(String fieldName) {
+ return (Short) getFieldValue(fieldName);
+ }
+
+ public int getInteger(String fieldName) {
+ return (Integer) getFieldValue(fieldName);
+ }
+
+ public float getFloat(String fieldName) {
+ return (Float) getFieldValue(fieldName);
+ }
+
+ public double getDouble(String fieldName) {
+ return (Double) getFieldValue(fieldName);
+ }
+
+ public long getLong(String fieldName) {
+ return (Long) getFieldValue(fieldName);
+ }
+
+ public String getString(String fieldName) {
+ return (String) getFieldValue(fieldName);
+ }
+
+ public Date getDate(String fieldName) {
+ return (Date) getFieldValue(fieldName);
+ }
+
+ public GregorianCalendar getGregorianCalendar(String fieldName) {
+ return (GregorianCalendar) getFieldValue(fieldName);
+ }
+
+ public BigDecimal getBigDecimal(String fieldName) {
+ return (BigDecimal) getFieldValue(fieldName);
+ }
+
+ public boolean getBoolean(String fieldName) {
+ return (boolean) getFieldValue(fieldName);
+ }
+
+ public Object getFieldValue(int fieldIdx) {
+ if (nullFields.contains(fieldIdx)) {
+ return null;
+ }
+
+ return dataValues.get(fieldIdx);
+ }
+
+ public byte getByte(int idx) {
+ return (Byte) getFieldValue(idx);
+ }
+
+ public short getShort(int idx) {
+ return (Short) getFieldValue(idx);
+ }
+
+ public int getInteger(int idx) {
+ return (Integer) getFieldValue(idx);
+ }
+
+ public float getFloat(int idx) {
+ return (Float) getFieldValue(idx);
+ }
+
+ public double getDouble(int idx) {
+ return (Double) getFieldValue(idx);
+ }
+
+ public long getLong(int idx) {
+ return (Long) getFieldValue(idx);
+ }
+
+ public String getString(int idx) {
+ return (String) getFieldValue(idx);
+ }
+
+ public Date getDate(int idx) {
+ return (Date) getFieldValue(idx);
+ }
+
+ public GregorianCalendar getGregorianCalendar(int idx) {
+ return (GregorianCalendar) getFieldValue(idx);
+ }
+
+ public BigDecimal getBigDecimal(int idx) {
+ return (BigDecimal) getFieldValue(idx);
+ }
+
+ public boolean getBoolean(int idx) {
+ return (boolean) getFieldValue(idx);
+ }
+
+ public int size() {
+ return dataValues.size();
+ }
+
+ public List<Object> getDataValues() {
+ return dataValues;
+ }
+
+ public void setDataValues(List<Object> dataValues) {
+ this.dataValues = dataValues;
+ }
+
+ public BeamSqlRowType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(BeamSqlRowType dataType) {
+ this.dataType = dataType;
+ }
+
+ public void setNullFields(List<Integer> nullFields) {
+ this.nullFields = nullFields;
+ }
+
+ public List<Integer> getNullFields() {
+ return nullFields;
+ }
+
+ /**
+ * is the specified field NULL?
+ */
+ public boolean isNull(int idx) {
+ return nullFields.contains(idx);
+ }
+
+ public Instant getWindowStart() {
+ return windowStart;
+ }
+
+ public Instant getWindowEnd() {
+ return windowEnd;
+ }
+
+ public void setWindowStart(Instant windowStart) {
+ this.windowStart = windowStart;
+ }
+
+ public void setWindowEnd(Instant windowEnd) {
+ this.windowEnd = windowEnd;
+ }
+
+ @Override
+ public String toString() {
+ return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
+ + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
+ }
+
+ /**
+ * Return data fields as key=value.
+ */
+ public String valueInString() {
+ StringBuilder sb = new StringBuilder();
+ for (int idx = 0; idx < size(); ++idx) {
+ sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
+ }
+ return sb.substring(1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BeamSqlRow other = (BeamSqlRow) obj;
+ return toString().equals(other.toString());
+ }
+
+ @Override public int hashCode() {
+ return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
new file mode 100644
index 0000000..39e2fd3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+
+/**
+ * A {@link Coder} encodes {@link BeamSqlRow}.
+ */
+public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
+ private BeamSqlRowType tableSchema;
+
+ private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
+
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final DoubleCoder doubleCoder = DoubleCoder.of();
+ private static final InstantCoder instantCoder = InstantCoder.of();
+ private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
+ private static final ByteCoder byteCoder = ByteCoder.of();
+
+ public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ @Override
+ public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException {
+ listCoder.encode(value.getNullFields(), outStream);
+ for (int idx = 0; idx < value.size(); ++idx) {
+ if (value.getNullFields().contains(idx)) {
+ continue;
+ }
+
+ switch (CalciteUtils.getFieldType(value.getDataType(), idx)) {
+ case INTEGER:
+ intCoder.encode(value.getInteger(idx), outStream);
+ break;
+ case SMALLINT:
+ intCoder.encode((int) value.getShort(idx), outStream);
+ break;
+ case TINYINT:
+ byteCoder.encode(value.getByte(idx), outStream);
+ break;
+ case DOUBLE:
+ doubleCoder.encode(value.getDouble(idx), outStream);
+ break;
+ case FLOAT:
+ doubleCoder.encode((double) value.getFloat(idx), outStream);
+ break;
+ case DECIMAL:
+ bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
+ break;
+ case BIGINT:
+ longCoder.encode(value.getLong(idx), outStream);
+ break;
+ case VARCHAR:
+ case CHAR:
+ stringCoder.encode(value.getString(idx), outStream);
+ break;
+ case TIME:
+ longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
+ break;
+ case DATE:
+ case TIMESTAMP:
+ longCoder.encode(value.getDate(idx).getTime(), outStream);
+ break;
+ case BOOLEAN:
+ byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!");
+ }
+ }
+
+ instantCoder.encode(value.getWindowStart(), outStream);
+ instantCoder.encode(value.getWindowEnd(), outStream);
+ }
+
+ @Override
+ public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
+ List<Integer> nullFields = listCoder.decode(inStream);
+
+ BeamSqlRow record = new BeamSqlRow(tableSchema);
+ record.setNullFields(nullFields);
+ for (int idx = 0; idx < tableSchema.size(); ++idx) {
+ if (nullFields.contains(idx)) {
+ continue;
+ }
+
+ switch (CalciteUtils.getFieldType(tableSchema, idx)) {
+ case INTEGER:
+ record.addField(idx, intCoder.decode(inStream));
+ break;
+ case SMALLINT:
+ record.addField(idx, intCoder.decode(inStream).shortValue());
+ break;
+ case TINYINT:
+ record.addField(idx, byteCoder.decode(inStream));
+ break;
+ case DOUBLE:
+ record.addField(idx, doubleCoder.decode(inStream));
+ break;
+ case FLOAT:
+ record.addField(idx, doubleCoder.decode(inStream).floatValue());
+ break;
+ case BIGINT:
+ record.addField(idx, longCoder.decode(inStream));
+ break;
+ case DECIMAL:
+ record.addField(idx, bigDecimalCoder.decode(inStream));
+ break;
+ case VARCHAR:
+ case CHAR:
+ record.addField(idx, stringCoder.decode(inStream));
+ break;
+ case TIME:
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date(longCoder.decode(inStream)));
+ record.addField(idx, calendar);
+ break;
+ case DATE:
+ case TIMESTAMP:
+ record.addField(idx, new Date(longCoder.decode(inStream)));
+ break;
+ case BOOLEAN:
+ record.addField(idx, byteCoder.decode(inStream) == 1);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Data type: "
+ + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx))
+ + " not supported yet!");
+ }
+ }
+
+ record.setWindowStart(instantCoder.decode(inStream));
+ record.setWindowEnd(instantCoder.decode(inStream));
+
+ return record;
+ }
+
+ public BeamSqlRowType getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public void verifyDeterministic()
+ throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
new file mode 100644
index 0000000..018fe81
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Field type information in {@link BeamSqlRow}.
+ *
+ */
+@AutoValue
+public abstract class BeamSqlRowType implements Serializable {
+ public abstract List<String> getFieldsName();
+ public abstract List<Integer> getFieldsType();
+
+ public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) {
+ return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
+ }
+
+ public int size() {
+ return getFieldsName().size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
new file mode 100644
index 0000000..c179935
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * This interface defines a Beam Sql Table.
+ */
+public interface BeamSqlTable {
+ /**
+ * In Beam SQL, there's no difference between a batch query and a streaming
+ * query. {@link BeamIOType} is used to validate the sources.
+ */
+ BeamIOType getSourceType();
+
+ /**
+ * create a {@code PCollection<BeamSqlRow>} from source.
+ *
+ */
+ PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
+
+ /**
+ * create a {@code IO.write()} instance to write to target.
+ *
+ */
+ PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
+
+ /**
+ * Get the schema info of the table.
+ */
+ BeamSqlRowType getRowType();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java
new file mode 100644
index 0000000..2f78586
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * abstract class of aggregation functions in Beam SQL.
+ *
+ * <p>There're several constrains for a UDAF:<br>
+ * 1. A constructor with an empty argument list is required;<br>
+ * 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double
+ * /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT
+ * /TIMESTAMP/DECIMAL;<br>
+ * 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br>
+ */
+public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable {
+ public BeamSqlUdaf(){}
+
+ /**
+ * create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}.
+ */
+ public abstract AccumT init();
+
+ /**
+ * add an input value, equals to {@link CombineFn#addInput(Object, Object)}.
+ */
+ public abstract AccumT add(AccumT accumulator, InputT input);
+
+ /**
+ * merge aggregation objects from parallel tasks, equals to
+ * {@link CombineFn#mergeAccumulators(Iterable)}.
+ */
+ public abstract AccumT merge(Iterable<AccumT> accumulators);
+
+ /**
+ * extract output value from aggregation object, equals to
+ * {@link CombineFn#extractOutput(Object)}.
+ */
+ public abstract OutputT result(AccumT accumulator);
+
+ /**
+ * get the coder for AccumT which stores the intermediate result.
+ * By default it's fetched from {@link CoderRegistry}.
+ */
+ public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry)
+ throws CannotProvideCoderException {
+ return registry.getCoder(
+ (Class<AccumT>) ((ParameterizedType) getClass()
+ .getGenericSuperclass()).getActualTypeArguments()[1]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
new file mode 100644
index 0000000..191b78e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Interface to create a UDF in Beam SQL.
+ *
+ * <p>A static method {@code eval} is required. Here is an example:
+ *
+ * <blockquote><pre>
+ * public static class MyLeftFunction {
+ * public String eval(
+ * @Parameter(name = "s") String s,
+ * @Parameter(name = "n", optional = true) Integer n) {
+ * return s.substring(0, n == null ? 1 : n);
+ * }
+ * }</pre></blockquote>
+ *
+ * <p>The first parameter is named "s" and is mandatory,
+ * and the second parameter is named "n" and is optional.
+ */
+public interface BeamSqlUdf extends Serializable {
+ String UDF_METHOD = "eval";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
new file mode 100644
index 0000000..53e8483
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * Utility methods for working with {@code BeamTable}.
+ */
+public final class BeamTableUtils {
+ public static BeamSqlRow csvLine2BeamSqlRow(
+ CSVFormat csvFormat,
+ String line,
+ BeamSqlRowType beamSqlRowType) {
+ BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
+ try (StringReader reader = new StringReader(line)) {
+ CSVParser parser = csvFormat.parse(reader);
+ CSVRecord rawRecord = parser.getRecords().get(0);
+
+ if (rawRecord.size() != beamSqlRowType.size()) {
+ throw new IllegalArgumentException(String.format(
+ "Expect %d fields, but actually %d",
+ beamSqlRowType.size(), rawRecord.size()
+ ));
+ } else {
+ for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
+ String raw = rawRecord.get(idx);
+ addFieldWithAutoTypeCasting(row, idx, raw);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("decodeRecord failed!", e);
+ }
+ return row;
+ }
+
+ public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) {
+ StringWriter writer = new StringWriter();
+ try (CSVPrinter printer = csvFormat.print(writer)) {
+ for (int i = 0; i < row.size(); i++) {
+ printer.print(row.getFieldValue(i).toString());
+ }
+ printer.println();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("encodeRecord failed!", e);
+ }
+ return writer.toString();
+ }
+
+ public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) {
+ if (rawObj == null) {
+ row.addField(idx, null);
+ return;
+ }
+
+ SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx);
+ // auto-casting for numberics
+ if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
+ || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
+ String raw = rawObj.toString();
+ switch (columnType) {
+ case TINYINT:
+ row.addField(idx, Byte.valueOf(raw));
+ break;
+ case SMALLINT:
+ row.addField(idx, Short.valueOf(raw));
+ break;
+ case INTEGER:
+ row.addField(idx, Integer.valueOf(raw));
+ break;
+ case BIGINT:
+ row.addField(idx, Long.valueOf(raw));
+ break;
+ case FLOAT:
+ row.addField(idx, Float.valueOf(raw));
+ break;
+ case DOUBLE:
+ row.addField(idx, Double.valueOf(raw));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Column type %s is not supported yet!", columnType));
+ }
+ } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
+ // convert NlsString to String
+ if (rawObj instanceof NlsString) {
+ row.addField(idx, ((NlsString) rawObj).getValue());
+ } else {
+ row.addField(idx, rawObj);
+ }
+ } else {
+ // keep the origin
+ row.addField(idx, rawObj);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000..2a50947
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+ private CSVFormat csvFormat;
+ public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ List<String> topics) {
+ this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
+ }
+
+ public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ List<String> topics, CSVFormat format) {
+ super(beamSqlRowType, bootstrapServers, topics);
+ this.csvFormat = format;
+ }
+
+ @Override
+ public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+ getPTransformForInput() {
+ return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
+ }
+
+ @Override
+ public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput() {
+ return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
+ }
+
+ /**
+ * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}.
+ *
+ */
+ public static class CsvRecorderDecoder
+ extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
+ private BeamSqlRowType rowType;
+ private CSVFormat format;
+ public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+ this.rowType = rowType;
+ this.format = format;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) {
+ return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String rowInString = new String(c.element().getValue());
+ c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType));
+ }
+ }));
+ }
+ }
+
+ /**
+ * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}.
+ *
+ */
+ public static class CsvRecorderEncoder
+ extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
+ private BeamSqlRowType rowType;
+ private CSVFormat format;
+ public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+ this.rowType = rowType;
+ this.format = format;
+ }
+
+ @Override
+ public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamSqlRow in = c.element();
+ c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
+ }
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000..2cc664f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
+
+ private String bootstrapServers;
+ private List<String> topics;
+ private Map<String, Object> configUpdates;
+
+ protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
+ }
+
+ public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ List<String> topics) {
+ super(beamSqlRowType);
+ this.bootstrapServers = bootstrapServers;
+ this.topics = topics;
+ }
+
+ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
+ this.configUpdates = configUpdates;
+ return this;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+ getPTransformForInput();
+
+ public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput();
+
+ @Override
+ public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply("read",
+ KafkaIO.<byte[], byte[]>read()
+ .withBootstrapServers(bootstrapServers)
+ .withTopics(topics)
+ .updateConsumerProperties(configUpdates)
+ .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withoutMetadata())
+ .apply("in_format", getPTransformForInput());
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ checkArgument(topics != null && topics.size() == 1,
+ "Only one topic can be acceptable as output.");
+
+ return new PTransform<PCollection<BeamSqlRow>, PDone>() {
+ @Override
+ public PDone expand(PCollection<BeamSqlRow> input) {
+ return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
+ KafkaIO.<byte[], byte[]>write()
+ .withBootstrapServers(bootstrapServers)
+ .withTopic(topics.get(0))
+ .withKeySerializer(ByteArraySerializer.class)
+ .withValueSerializer(ByteArraySerializer.class));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
new file mode 100644
index 0000000..f0ddeb6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.apache.beam.sdk.extensions.sql.schema.kafka;