You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:55 UTC

[32/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
new file mode 100644
index 0000000..fb0a8e2
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
+ *
+ */
+package org.apache.beam.sdk.extensions.sql.rel;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
new file mode 100644
index 0000000..17e3f80
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import com.google.common.collect.ImmutableList;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamAggregationRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Duration;
+
+/**
+ * Rule to detect the window/trigger settings.
+ *
+ */
+public class BeamAggregationRule extends RelOptRule {
+  public static final BeamAggregationRule INSTANCE =
+      new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
+
+  public BeamAggregationRule(
+      Class<? extends Aggregate> aggregateClass,
+      Class<? extends Project> projectClass,
+      RelBuilderFactory relBuilderFactory) {
+    super(
+        operand(aggregateClass,
+            operand(projectClass, any())),
+        relBuilderFactory, null);
+  }
+
+  public BeamAggregationRule(RelOptRuleOperand operand, String description) {
+    super(operand, description);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+    final Project project = call.rel(1);
+    updateWindowTrigger(call, aggregate, project);
+  }
+
+  private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
+      Project project) {
+    ImmutableBitSet groupByFields = aggregate.getGroupSet();
+    List<RexNode> projectMapping = project.getProjects();
+
+    WindowFn windowFn = new GlobalWindows();
+    Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
+    int windowFieldIdx = -1;
+    Duration allowedLatence = Duration.ZERO;
+
+    for (int groupField : groupByFields.asList()) {
+      RexNode projNode = projectMapping.get(groupField);
+      if (projNode instanceof RexCall) {
+        SqlOperator op = ((RexCall) projNode).op;
+        ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
+        String functionName = op.getName();
+        switch (functionName) {
+        case "TUMBLE":
+          windowFieldIdx = groupField;
+          windowFn = FixedWindows
+              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+          if (parameters.size() == 3) {
+            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+                .getValue();
+            triggerFn = createTriggerWithDelay(delayTime);
+            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+          }
+          break;
+        case "HOP":
+          windowFieldIdx = groupField;
+          windowFn = SlidingWindows
+              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
+              .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
+          if (parameters.size() == 4) {
+            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
+                .getValue();
+            triggerFn = createTriggerWithDelay(delayTime);
+            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+          }
+          break;
+        case "SESSION":
+          windowFieldIdx = groupField;
+          windowFn = Sessions
+              .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
+          if (parameters.size() == 3) {
+            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
+                .getValue();
+            triggerFn = createTriggerWithDelay(delayTime);
+            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
+          }
+          break;
+        default:
+          break;
+        }
+      }
+    }
+
+    BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
+        aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(aggregate.getInput(),
+            aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        aggregate.indicator,
+        aggregate.getGroupSet(),
+        aggregate.getGroupSets(),
+        aggregate.getAggCallList(),
+        windowFn,
+        triggerFn,
+        windowFieldIdx,
+        allowedLatence);
+    call.transformTo(newAggregator);
+  }
+
+  private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
+    return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
+        .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
+  }
+
+  private long getWindowParameterAsMillis(RexNode parameterNode) {
+    if (parameterNode instanceof RexLiteral) {
+      return RexLiteral.intValue(parameterNode);
+    } else {
+      throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
new file mode 100644
index 0000000..b30a9d9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+
+/**
+ * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
+ *
+ */
+public class BeamFilterRule extends ConverterRule {
+  public static final BeamFilterRule INSTANCE = new BeamFilterRule();
+
+  private BeamFilterRule() {
+    super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final Filter filter = (Filter) rel;
+    final RelNode input = filter.getInput();
+
+    return new BeamFilterRel(filter.getCluster(),
+        filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        filter.getCondition());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
new file mode 100644
index 0000000..54079b0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamIOSinkRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableModify} with
+ * {@link BeamIOSinkRel}.
+ *
+ */
+public class BeamIOSinkRule extends ConverterRule {
+  public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
+
+  private BeamIOSinkRule() {
+    super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+        "BeamIOSinkRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final TableModify tableModify = (TableModify) rel;
+    final RelNode input = tableModify.getInput();
+
+    final RelOptCluster cluster = tableModify.getCluster();
+    final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
+    final RelOptTable relOptTable = tableModify.getTable();
+    final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+    final RelNode convertedInput = convert(input,
+        input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
+    final TableModify.Operation operation = tableModify.getOperation();
+    final List<String> updateColumnList = tableModify.getUpdateColumnList();
+    final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+    final boolean flattened = tableModify.isFlattened();
+
+    final Table table = tableModify.getTable().unwrap(Table.class);
+
+    switch (table.getJdbcTableType()) {
+    case TABLE:
+    case STREAM:
+      if (operation != TableModify.Operation.INSERT) {
+        throw new UnsupportedOperationException(
+            String.format("Streams doesn't support %s modify operation", operation));
+      }
+      return new BeamIOSinkRel(cluster, traitSet,
+          relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+          sourceExpressionList, flattened);
+    default:
+      throw new IllegalArgumentException(
+          String.format("Unsupported table type: %s", table.getJdbcTableType()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
new file mode 100644
index 0000000..496b977
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableScan} with
+ * {@link BeamIOSourceRel}.
+ *
+ */
+public class BeamIOSourceRule extends ConverterRule {
+  public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
+
+  private BeamIOSourceRule() {
+    super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+        "BeamIOSourceRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final TableScan scan = (TableScan) rel;
+
+    return new BeamIOSourceRel(scan.getCluster(),
+        scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
new file mode 100644
index 0000000..6fdbd9b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamIntersectRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+
+/**
+ * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRule extends ConverterRule {
+  public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
+  private BeamIntersectRule() {
+    super(LogicalIntersect.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Intersect intersect = (Intersect) rel;
+    final List<RelNode> inputs = intersect.getInputs();
+    return new BeamIntersectRel(
+        intersect.getCluster(),
+        intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convertList(inputs, BeamLogicalConvention.INSTANCE),
+        intersect.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
new file mode 100644
index 0000000..147932e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
+ */
+public class BeamJoinRule extends ConverterRule {
+  public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+  private BeamJoinRule() {
+    super(LogicalJoin.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamJoinRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Join join = (Join) rel;
+    return new BeamJoinRel(
+        join.getCluster(),
+        join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(join.getLeft(),
+            join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        convert(join.getRight(),
+            join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        join.getCondition(),
+        join.getVariablesSet(),
+        join.getJoinType()
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
new file mode 100644
index 0000000..363cf3b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamMinusRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+
+/**
+ * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
+ */
+public class BeamMinusRule extends ConverterRule {
+  public static final BeamMinusRule INSTANCE = new BeamMinusRule();
+  private BeamMinusRule() {
+    super(LogicalMinus.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamMinusRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Minus minus = (Minus) rel;
+    final List<RelNode> inputs = minus.getInputs();
+    return new BeamMinusRel(
+        minus.getCluster(),
+        minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convertList(inputs, BeamLogicalConvention.INSTANCE),
+        minus.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
new file mode 100644
index 0000000..4f2f8c9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+
+/**
+ * A {@code ConverterRule} to replace {@link Project} with
+ * {@link BeamProjectRel}.
+ *
+ */
+public class BeamProjectRule extends ConverterRule {
+  public static final BeamProjectRule INSTANCE = new BeamProjectRule();
+
+  private BeamProjectRule() {
+    super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
+  }
+
+  @Override
+  public RelNode convert(RelNode rel) {
+    final Project project = (Project) rel;
+    final RelNode input = project.getInput();
+
+    return new BeamProjectRel(project.getCluster(),
+        project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        project.getProjects(), project.getRowType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
new file mode 100644
index 0000000..e104d37
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamSortRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalSort;
+
+/**
+ * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}.
+ */
+public class BeamSortRule extends ConverterRule {
+  public static final BeamSortRule INSTANCE = new BeamSortRule();
+  private BeamSortRule() {
+    super(LogicalSort.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamSortRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Sort sort = (Sort) rel;
+    final RelNode input = sort.getInput();
+    return new BeamSortRel(
+        sort.getCluster(),
+        sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        sort.getCollation(),
+        sort.offset,
+        sort.fetch
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
new file mode 100644
index 0000000..975ccbc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamUnionRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+
+/**
+ * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
+ * {@link BeamUnionRule}.
+ */
+public class BeamUnionRule extends ConverterRule {
+  public static final BeamUnionRule INSTANCE = new BeamUnionRule();
+  private BeamUnionRule() {
+    super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+        "BeamUnionRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Union union = (Union) rel;
+
+    return new BeamUnionRel(
+        union.getCluster(),
+        union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
+        union.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
new file mode 100644
index 0000000..86a8f72
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.rule;
+
+import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.rel.BeamValuesRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalValues;
+
+/**
+ * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}.
+ */
+public class BeamValuesRule extends ConverterRule {
+  public static final BeamValuesRule INSTANCE = new BeamValuesRule();
+  private BeamValuesRule() {
+    super(LogicalValues.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamValuesRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Values values = (Values) rel;
+    return new BeamValuesRel(
+        values.getCluster(),
+        values.getRowType(),
+        values.getTuples(),
+        values.getTraitSet().replace(BeamLogicalConvention.INSTANCE)
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
new file mode 100644
index 0000000..f57cdee
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.calcite.plan.RelOptRule} to generate
+ * {@link org.apache.beam.sdk.extensions.sql.rel.BeamRelNode}.
+ */
+package org.apache.beam.sdk.extensions.sql.rule;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
new file mode 100644
index 0000000..bf41c95
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
+  protected BeamSqlRowType beamSqlRowType;
+  public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
+    this.beamSqlRowType = beamSqlRowType;
+  }
+
+  @Override public BeamSqlRowType getRowType() {
+    return beamSqlRowType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
new file mode 100644
index 0000000..bda3ca1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+  BOUNDED, UNBOUNDED;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
new file mode 100644
index 0000000..5bbb8fd
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
+ * then a downstream query can query directly.
+ */
+public class BeamPCollectionTable extends BaseBeamTable {
+  private BeamIOType ioType;
+  private transient PCollection<BeamSqlRow> upstream;
+
+  protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
+  }
+
+  public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
+      BeamSqlRowType beamSqlRowType){
+    this(beamSqlRowType);
+    ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
+        ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
+    this.upstream = upstream;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return ioType;
+  }
+
+  @Override
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    return upstream;
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+    throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
new file mode 100644
index 0000000..616e9f3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Instant;
+
+/**
+ * Represent a generic ROW record in Beam SQL.
+ *
+ */
+public class BeamSqlRow implements Serializable {
+  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+  static {
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+  }
+
+  private List<Integer> nullFields = new ArrayList<>();
+  private List<Object> dataValues;
+  private BeamSqlRowType dataType;
+
+  private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+  private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+
+  public BeamSqlRow(BeamSqlRowType dataType) {
+    this.dataType = dataType;
+    this.dataValues = new ArrayList<>();
+    for (int idx = 0; idx < dataType.size(); ++idx) {
+      dataValues.add(null);
+      nullFields.add(idx);
+    }
+  }
+
+  public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
+    this(dataType);
+    for (int idx = 0; idx < dataValues.size(); ++idx) {
+      addField(idx, dataValues.get(idx));
+    }
+  }
+
+  public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){
+    windowStart = upstreamRecord.windowStart;
+    windowEnd = upstreamRecord.windowEnd;
+
+    if (window instanceof IntervalWindow) {
+      IntervalWindow iWindow = (IntervalWindow) window;
+      windowStart = iWindow.start();
+      windowEnd = iWindow.end();
+    }
+  }
+
+  public void addField(String fieldName, Object fieldValue) {
+    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+  }
+
+  public void addField(int index, Object fieldValue) {
+    if (fieldValue == null) {
+      return;
+    } else {
+      if (nullFields.contains(index)) {
+        nullFields.remove(nullFields.indexOf(index));
+      }
+    }
+
+    validateValueType(index, fieldValue);
+    dataValues.set(index, fieldValue);
+  }
+
+  private void validateValueType(int index, Object fieldValue) {
+    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
+    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
+    if (javaClazz == null) {
+      throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
+    }
+
+    if (!fieldValue.getClass().equals(javaClazz)) {
+      throw new IllegalArgumentException(
+          String.format("[%s](%s) doesn't match type [%s]",
+              fieldValue, fieldValue.getClass(), fieldType)
+      );
+    }
+  }
+
+  public Object getFieldValue(String fieldName) {
+    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+  }
+
+  public byte getByte(String fieldName) {
+    return (Byte) getFieldValue(fieldName);
+  }
+
+  public short getShort(String fieldName) {
+    return (Short) getFieldValue(fieldName);
+  }
+
+  public int getInteger(String fieldName) {
+    return (Integer) getFieldValue(fieldName);
+  }
+
+  public float getFloat(String fieldName) {
+    return (Float) getFieldValue(fieldName);
+  }
+
+  public double getDouble(String fieldName) {
+    return (Double) getFieldValue(fieldName);
+  }
+
+  public long getLong(String fieldName) {
+    return (Long) getFieldValue(fieldName);
+  }
+
+  public String getString(String fieldName) {
+    return (String) getFieldValue(fieldName);
+  }
+
+  public Date getDate(String fieldName) {
+    return (Date) getFieldValue(fieldName);
+  }
+
+  public GregorianCalendar getGregorianCalendar(String fieldName) {
+    return (GregorianCalendar) getFieldValue(fieldName);
+  }
+
+  public BigDecimal getBigDecimal(String fieldName) {
+    return (BigDecimal) getFieldValue(fieldName);
+  }
+
+  public boolean getBoolean(String fieldName) {
+    return (boolean) getFieldValue(fieldName);
+  }
+
+  public Object getFieldValue(int fieldIdx) {
+    if (nullFields.contains(fieldIdx)) {
+      return null;
+    }
+
+    return dataValues.get(fieldIdx);
+  }
+
+  public byte getByte(int idx) {
+    return (Byte) getFieldValue(idx);
+  }
+
+  public short getShort(int idx) {
+    return (Short) getFieldValue(idx);
+  }
+
+  public int getInteger(int idx) {
+    return (Integer) getFieldValue(idx);
+  }
+
+  public float getFloat(int idx) {
+    return (Float) getFieldValue(idx);
+  }
+
+  public double getDouble(int idx) {
+    return (Double) getFieldValue(idx);
+  }
+
+  public long getLong(int idx) {
+    return (Long) getFieldValue(idx);
+  }
+
+  public String getString(int idx) {
+    return (String) getFieldValue(idx);
+  }
+
+  public Date getDate(int idx) {
+    return (Date) getFieldValue(idx);
+  }
+
+  public GregorianCalendar getGregorianCalendar(int idx) {
+    return (GregorianCalendar) getFieldValue(idx);
+  }
+
+  public BigDecimal getBigDecimal(int idx) {
+    return (BigDecimal) getFieldValue(idx);
+  }
+
+  public boolean getBoolean(int idx) {
+    return (boolean) getFieldValue(idx);
+  }
+
+  public int size() {
+    return dataValues.size();
+  }
+
+  public List<Object> getDataValues() {
+    return dataValues;
+  }
+
+  public void setDataValues(List<Object> dataValues) {
+    this.dataValues = dataValues;
+  }
+
+  public BeamSqlRowType getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(BeamSqlRowType dataType) {
+    this.dataType = dataType;
+  }
+
+  public void setNullFields(List<Integer> nullFields) {
+    this.nullFields = nullFields;
+  }
+
+  public List<Integer> getNullFields() {
+    return nullFields;
+  }
+
+  /**
+   * is the specified field NULL?
+   */
+  public boolean isNull(int idx) {
+    return nullFields.contains(idx);
+  }
+
+  public Instant getWindowStart() {
+    return windowStart;
+  }
+
+  public Instant getWindowEnd() {
+    return windowEnd;
+  }
+
+  public void setWindowStart(Instant windowStart) {
+    this.windowStart = windowStart;
+  }
+
+  public void setWindowEnd(Instant windowEnd) {
+    this.windowEnd = windowEnd;
+  }
+
+  @Override
+  public String toString() {
+    return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
+        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
+  }
+
+  /**
+   * Return data fields as key=value.
+   */
+  public String valueInString() {
+    StringBuilder sb = new StringBuilder();
+    for (int idx = 0; idx < size(); ++idx) {
+      sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
+    }
+    return sb.substring(1);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    BeamSqlRow other = (BeamSqlRow) obj;
+    return toString().equals(other.toString());
+  }
+
+  @Override public int hashCode() {
+    return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
new file mode 100644
index 0000000..39e2fd3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+
+/**
+ *  A {@link Coder} encodes {@link BeamSqlRow}.
+ */
+public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
+  private BeamSqlRowType tableSchema;
+
+  private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
+  private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+  private static final DoubleCoder doubleCoder = DoubleCoder.of();
+  private static final InstantCoder instantCoder = InstantCoder.of();
+  private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
+  private static final ByteCoder byteCoder = ByteCoder.of();
+
+  public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
+    this.tableSchema = tableSchema;
+  }
+
+  @Override
+  public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException {
+    listCoder.encode(value.getNullFields(), outStream);
+    for (int idx = 0; idx < value.size(); ++idx) {
+      if (value.getNullFields().contains(idx)) {
+        continue;
+      }
+
+      switch (CalciteUtils.getFieldType(value.getDataType(), idx)) {
+        case INTEGER:
+          intCoder.encode(value.getInteger(idx), outStream);
+          break;
+        case SMALLINT:
+          intCoder.encode((int) value.getShort(idx), outStream);
+          break;
+        case TINYINT:
+          byteCoder.encode(value.getByte(idx), outStream);
+          break;
+        case DOUBLE:
+          doubleCoder.encode(value.getDouble(idx), outStream);
+          break;
+        case FLOAT:
+          doubleCoder.encode((double) value.getFloat(idx), outStream);
+          break;
+        case DECIMAL:
+          bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
+          break;
+        case BIGINT:
+          longCoder.encode(value.getLong(idx), outStream);
+          break;
+        case VARCHAR:
+        case CHAR:
+          stringCoder.encode(value.getString(idx), outStream);
+          break;
+        case TIME:
+          longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
+          break;
+        case DATE:
+        case TIMESTAMP:
+          longCoder.encode(value.getDate(idx).getTime(), outStream);
+          break;
+        case BOOLEAN:
+          byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
+          break;
+
+        default:
+          throw new UnsupportedOperationException(
+              "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!");
+      }
+    }
+
+    instantCoder.encode(value.getWindowStart(), outStream);
+    instantCoder.encode(value.getWindowEnd(), outStream);
+  }
+
+  @Override
+  public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
+    List<Integer> nullFields = listCoder.decode(inStream);
+
+    BeamSqlRow record = new BeamSqlRow(tableSchema);
+    record.setNullFields(nullFields);
+    for (int idx = 0; idx < tableSchema.size(); ++idx) {
+      if (nullFields.contains(idx)) {
+        continue;
+      }
+
+      switch (CalciteUtils.getFieldType(tableSchema, idx)) {
+        case INTEGER:
+          record.addField(idx, intCoder.decode(inStream));
+          break;
+        case SMALLINT:
+          record.addField(idx, intCoder.decode(inStream).shortValue());
+          break;
+        case TINYINT:
+          record.addField(idx, byteCoder.decode(inStream));
+          break;
+        case DOUBLE:
+          record.addField(idx, doubleCoder.decode(inStream));
+          break;
+        case FLOAT:
+          record.addField(idx, doubleCoder.decode(inStream).floatValue());
+          break;
+        case BIGINT:
+          record.addField(idx, longCoder.decode(inStream));
+          break;
+        case DECIMAL:
+          record.addField(idx, bigDecimalCoder.decode(inStream));
+          break;
+        case VARCHAR:
+        case CHAR:
+          record.addField(idx, stringCoder.decode(inStream));
+          break;
+        case TIME:
+          GregorianCalendar calendar = new GregorianCalendar();
+          calendar.setTime(new Date(longCoder.decode(inStream)));
+          record.addField(idx, calendar);
+          break;
+        case DATE:
+        case TIMESTAMP:
+          record.addField(idx, new Date(longCoder.decode(inStream)));
+          break;
+        case BOOLEAN:
+          record.addField(idx, byteCoder.decode(inStream) == 1);
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Data type: "
+              + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx))
+              + " not supported yet!");
+      }
+    }
+
+    record.setWindowStart(instantCoder.decode(inStream));
+    record.setWindowEnd(instantCoder.decode(inStream));
+
+    return record;
+  }
+
+  public BeamSqlRowType getTableSchema() {
+    return tableSchema;
+  }
+
+  @Override
+  public void verifyDeterministic()
+      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
new file mode 100644
index 0000000..018fe81
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Field type information in {@link BeamSqlRow}.
+ *
+ */
+@AutoValue
+public abstract class BeamSqlRowType implements Serializable {
+  public abstract List<String> getFieldsName();
+  public abstract List<Integer> getFieldsType();
+
+  public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) {
+    return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
+  }
+
+  public int size() {
+    return getFieldsName().size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
new file mode 100644
index 0000000..c179935
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+ * This interface defines a Beam Sql Table.
+ */
+public interface BeamSqlTable {
+  /**
+   * In Beam SQL, there's no difference between a batch query and a streaming
+   * query. {@link BeamIOType} is used to validate the sources.
+   */
+  BeamIOType getSourceType();
+
+  /**
+   * create a {@code PCollection<BeamSqlRow>} from source.
+   *
+   */
+  PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
+
+  /**
+   * create a {@code IO.write()} instance to write to target.
+   *
+   */
+   PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
+
+  /**
+   * Get the schema info of the table.
+   */
+   BeamSqlRowType getRowType();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java
new file mode 100644
index 0000000..2f78586
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * abstract class of aggregation functions in Beam SQL.
+ *
+ * <p>There're several constrains for a UDAF:<br>
+ * 1. A constructor with an empty argument list is required;<br>
+ * 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double
+ * /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT
+ * /TIMESTAMP/DECIMAL;<br>
+ * 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br>
+ */
+public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable {
+  public BeamSqlUdaf(){}
+
+  /**
+   * create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}.
+   */
+  public abstract AccumT init();
+
+  /**
+   * add an input value, equals to {@link CombineFn#addInput(Object, Object)}.
+   */
+  public abstract AccumT add(AccumT accumulator, InputT input);
+
+  /**
+   * merge aggregation objects from parallel tasks, equals to
+   *  {@link CombineFn#mergeAccumulators(Iterable)}.
+   */
+  public abstract AccumT merge(Iterable<AccumT> accumulators);
+
+  /**
+   * extract output value from aggregation object, equals to
+   * {@link CombineFn#extractOutput(Object)}.
+   */
+  public abstract OutputT result(AccumT accumulator);
+
+  /**
+   * get the coder for AccumT which stores the intermediate result.
+   * By default it's fetched from {@link CoderRegistry}.
+   */
+  public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry)
+      throws CannotProvideCoderException {
+    return registry.getCoder(
+        (Class<AccumT>) ((ParameterizedType) getClass()
+        .getGenericSuperclass()).getActualTypeArguments()[1]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
new file mode 100644
index 0000000..191b78e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Interface to create a UDF in Beam SQL.
+ *
+ * <p>A static method {@code eval} is required. Here is an example:
+ *
+ * <blockquote><pre>
+ * public static class MyLeftFunction {
+ *   public String eval(
+ *       &#64;Parameter(name = "s") String s,
+ *       &#64;Parameter(name = "n", optional = true) Integer n) {
+ *     return s.substring(0, n == null ? 1 : n);
+ *   }
+ * }</pre></blockquote>
+ *
+ * <p>The first parameter is named "s" and is mandatory,
+ * and the second parameter is named "n" and is optional.
+ */
+public interface BeamSqlUdf extends Serializable {
+  String UDF_METHOD = "eval";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
new file mode 100644
index 0000000..53e8483
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * Utility methods for working with {@code BeamTable}.
+ */
+public final class BeamTableUtils {
+  public static BeamSqlRow csvLine2BeamSqlRow(
+      CSVFormat csvFormat,
+      String line,
+      BeamSqlRowType beamSqlRowType) {
+    BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
+    try (StringReader reader = new StringReader(line)) {
+      CSVParser parser = csvFormat.parse(reader);
+      CSVRecord rawRecord = parser.getRecords().get(0);
+
+      if (rawRecord.size() != beamSqlRowType.size()) {
+        throw new IllegalArgumentException(String.format(
+            "Expect %d fields, but actually %d",
+            beamSqlRowType.size(), rawRecord.size()
+        ));
+      } else {
+        for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
+          String raw = rawRecord.get(idx);
+          addFieldWithAutoTypeCasting(row, idx, raw);
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("decodeRecord failed!", e);
+    }
+    return row;
+  }
+
+  public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) {
+    StringWriter writer = new StringWriter();
+    try (CSVPrinter printer = csvFormat.print(writer)) {
+      for (int i = 0; i < row.size(); i++) {
+        printer.print(row.getFieldValue(i).toString());
+      }
+      printer.println();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("encodeRecord failed!", e);
+    }
+    return writer.toString();
+  }
+
+  public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) {
+    if (rawObj == null) {
+      row.addField(idx, null);
+      return;
+    }
+
+    SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx);
+    // auto-casting for numberics
+    if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
+        || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
+      String raw = rawObj.toString();
+      switch (columnType) {
+        case TINYINT:
+          row.addField(idx, Byte.valueOf(raw));
+          break;
+        case SMALLINT:
+          row.addField(idx, Short.valueOf(raw));
+          break;
+        case INTEGER:
+          row.addField(idx, Integer.valueOf(raw));
+          break;
+        case BIGINT:
+          row.addField(idx, Long.valueOf(raw));
+          break;
+        case FLOAT:
+          row.addField(idx, Float.valueOf(raw));
+          break;
+        case DOUBLE:
+          row.addField(idx, Double.valueOf(raw));
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              String.format("Column type %s is not supported yet!", columnType));
+      }
+    } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
+      // convert NlsString to String
+      if (rawObj instanceof NlsString) {
+        row.addField(idx, ((NlsString) rawObj).getValue());
+      } else {
+        row.addField(idx, rawObj);
+      }
+    } else {
+      // keep the origin
+      row.addField(idx, rawObj);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000..2a50947
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+  private CSVFormat csvFormat;
+  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+      List<String> topics) {
+    this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
+  }
+
+  public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+      List<String> topics, CSVFormat format) {
+    super(beamSqlRowType, bootstrapServers, topics);
+    this.csvFormat = format;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+      getPTransformForInput() {
+    return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
+  }
+
+  @Override
+  public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+      getPTransformForOutput() {
+    return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
+  }
+
+  /**
+   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}.
+   *
+   */
+  public static class CsvRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
+    private BeamSqlRowType rowType;
+    private CSVFormat format;
+    public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+      this.rowType = rowType;
+      this.format = format;
+    }
+
+    @Override
+    public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          String rowInString = new String(c.element().getValue());
+          c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType));
+        }
+      }));
+    }
+  }
+
+  /**
+   * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}.
+   *
+   */
+  public static class CsvRecorderEncoder
+      extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
+    private BeamSqlRowType rowType;
+    private CSVFormat format;
+    public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+      this.rowType = rowType;
+      this.format = format;
+    }
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) {
+      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          BeamSqlRow in = c.element();
+          c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
+        }
+      }));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000..2cc664f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
+
+  private String bootstrapServers;
+  private List<String> topics;
+  private Map<String, Object> configUpdates;
+
+  protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
+  }
+
+  public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+      List<String> topics) {
+    super(beamSqlRowType);
+    this.bootstrapServers = bootstrapServers;
+    this.topics = topics;
+  }
+
+  public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
+    this.configUpdates = configUpdates;
+    return this;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+      getPTransformForInput();
+
+  public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+      getPTransformForOutput();
+
+  @Override
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply("read",
+            KafkaIO.<byte[], byte[]>read()
+                .withBootstrapServers(bootstrapServers)
+                .withTopics(topics)
+                .updateConsumerProperties(configUpdates)
+                .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+                .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+                .withoutMetadata())
+            .apply("in_format", getPTransformForInput());
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+    checkArgument(topics != null && topics.size() == 1,
+        "Only one topic can be acceptable as output.");
+
+    return new PTransform<PCollection<BeamSqlRow>, PDone>() {
+      @Override
+      public PDone expand(PCollection<BeamSqlRow> input) {
+        return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
+            KafkaIO.<byte[], byte[]>write()
+                .withBootstrapServers(bootstrapServers)
+                .withTopic(topics.get(0))
+                .withKeySerializer(ByteArraySerializer.class)
+                .withValueSerializer(ByteArraySerializer.class));
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
new file mode 100644
index 0000000..f0ddeb6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.apache.beam.sdk.extensions.sql.schema.kafka;