You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/04/18 21:41:00 UTC
[2/5] beam git commit: checkstyle and rename package
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
deleted file mode 100644
index 702381d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.planner;
-
-import org.apache.calcite.sql.util.SqlShuttle;
-
-/**
- * Unsupported operation to visit a RelNode.
- *
- */
-public class UnsupportedOperatorsVisitor extends SqlShuttle {
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
deleted file mode 100644
index d98c584..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface.
- * It defines data sources, validate a SQL statement, and convert it as a Beam
- * pipeline.
- */
-package org.beam.dsls.sql.planner;
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
deleted file mode 100644
index 64f2d1f..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexNode;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-import org.beam.dsls.sql.transform.BeamSQLFilterFn;
-
-/**
- * BeamRelNode to replace a {@code Filter} node.
- *
- */
-public class BeamFilterRel extends Filter implements BeamRelNode {
-
- public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
- RexNode condition) {
- super(cluster, traits, child, condition);
- }
-
- @Override
- public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
- return new BeamFilterRel(getCluster(), traitSet, input, condition);
- }
-
- @Override
- public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-
- RelNode input = getInput();
- BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
- String stageName = BeamSQLRelUtils.getStageName(this);
-
- PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
-
- BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
-
- PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
- ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
-
- planCreator.setLatestStream(projectStream);
-
- return planCreator.getPipeline();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
deleted file mode 100644
index 46654e5..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import com.google.common.base.Joiner;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rex.RexNode;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * BeamRelNode to replace a {@code TableModify} node.
- *
- */
-public class BeamIOSinkRel extends TableModify implements BeamRelNode {
- public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
- Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
- List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
- super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
- sourceExpressionList, flattened);
- }
-
- @Override
- public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
- getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
- }
-
- @Override
- public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-
- RelNode input = getInput();
- BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
- String stageName = BeamSQLRelUtils.getStageName(this);
-
- PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
-
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName);
-
- upstream.apply(stageName, targetTable.buildIOWriter());
-
- planCreator.setHasPersistent(true);
-
- return planCreator.getPipeline();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
deleted file mode 100644
index f14db92..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import com.google.common.base.Joiner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * BeamRelNode to replace a {@code TableScan} node.
- *
- */
-public class BeamIOSourceRel extends TableScan implements BeamRelNode {
-
- public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
- super(cluster, traitSet, table);
- }
-
- @Override
- public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", "");
-
- BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName);
-
- String stageName = BeamSQLRelUtils.getStageName(this);
-
- PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
- sourceTable.buildIOReader());
-
- planCreator.setLatestStream(sourceStream);
-
- return planCreator.getPipeline();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
deleted file mode 100644
index 50fe8e0..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-/**
- * Convertion for Beam SQL.
- *
- */
-public enum BeamLogicalConvention implements Convention {
- INSTANCE;
-
- @Override
- public Class getInterface() {
- return BeamRelNode.class;
- }
-
- @Override
- public String getName() {
- return "BEAM_LOGICAL";
- }
-
- @Override
- public RelTraitDef getTraitDef() {
- return ConventionTraitDef.INSTANCE;
- }
-
- @Override
- public boolean satisfies(RelTrait trait) {
- return this == trait;
- }
-
- @Override
- public void register(RelOptPlanner planner) {
- }
-
- @Override
- public String toString() {
- return getName();
- }
-
- @Override
- public boolean canConvertConvention(Convention toConvention) {
- return false;
- }
-
- @Override
- public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
deleted file mode 100644
index e41d74e..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-import org.beam.dsls.sql.transform.BeamSQLProjectFn;
-
-/**
- * BeamRelNode to replace a {@code Project} node.
- *
- */
-public class BeamProjectRel extends Project implements BeamRelNode {
-
- /**
- * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
- *
- */
- public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- List<? extends RexNode> projects, RelDataType rowType) {
- super(cluster, traits, input, projects, rowType);
- }
-
- @Override
- public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
- RelDataType rowType) {
- return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
- }
-
- @Override
- public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
- RelNode input = getInput();
- BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
- String stageName = BeamSQLRelUtils.getStageName(this);
-
- PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
-
- BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
-
- PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
- .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
-
- planCreator.setLatestStream(projectStream);
-
- return planCreator.getPipeline();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
deleted file mode 100644
index 07ffee5..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.calcite.rel.RelNode;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-
-/**
- * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's
- * called by {@link BeamPipelineCreator}.
- *
- */
-public interface BeamRelNode extends RelNode {
-
- /**
- * A {@link BeamRelNode} is a recursive structure, the
- * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
- * algorithm.
- *
- */
- Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
deleted file mode 100644
index 13dc962..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
- *
- */
-package org.beam.dsls.sql.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
deleted file mode 100644
index 2ad7c07..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.beam.dsls.sql.rel.BeamFilterRel;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-
-/**
- * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
- *
- */
-public class BeamFilterRule extends ConverterRule {
- public static final BeamFilterRule INSTANCE = new BeamFilterRule();
-
- private BeamFilterRule() {
- super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Filter filter = (Filter) rel;
- final RelNode input = filter.getInput();
-
- return new BeamFilterRel(filter.getCluster(),
- filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- filter.getCondition());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
deleted file mode 100644
index a44c002..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import java.util.List;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-import org.beam.dsls.sql.rel.BeamIOSinkRel;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
-public class BeamIOSinkRule extends ConverterRule {
- public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
-
- private BeamIOSinkRule() {
- super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
- "BeamIOSinkRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableModify tableModify = (TableModify) rel;
- final RelNode input = tableModify.getInput();
-
- final RelOptCluster cluster = tableModify.getCluster();
- final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
- final RelOptTable relOptTable = tableModify.getTable();
- final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
- final RelNode convertedInput = convert(input,
- input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
- final TableModify.Operation operation = tableModify.getOperation();
- final List<String> updateColumnList = tableModify.getUpdateColumnList();
- final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
- final boolean flattened = tableModify.isFlattened();
-
- final Table table = tableModify.getTable().unwrap(Table.class);
-
- switch (table.getJdbcTableType()) {
- case TABLE:
- case STREAM:
- if (operation != TableModify.Operation.INSERT) {
- throw new UnsupportedOperationException(
- String.format("Streams doesn't support %s modify operation", operation));
- }
- return new BeamIOSinkRel(cluster, traitSet,
- relOptTable, catalogReader, convertedInput, operation, updateColumnList,
- sourceExpressionList, flattened);
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported table type: %s", table.getJdbcTableType()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
deleted file mode 100644
index 9e4778b..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-import org.beam.dsls.sql.rel.BeamIOSourceRel;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
-public class BeamIOSourceRule extends ConverterRule {
- public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
-
- private BeamIOSourceRule() {
- super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
- "BeamIOSourceRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableScan scan = (TableScan) rel;
-
- return new BeamIOSourceRel(scan.getCluster(),
- scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
deleted file mode 100644
index 117a056..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.beam.dsls.sql.rel.BeamProjectRel;
-
-/**
- * A {@code ConverterRule} to replace {@link Project} with
- * {@link BeamProjectRel}.
- *
- */
-public class BeamProjectRule extends ConverterRule {
- public static final BeamProjectRule INSTANCE = new BeamProjectRule();
-
- private BeamProjectRule() {
- super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Project project = (Project) rel;
- final RelNode input = project.getInput();
-
- return new BeamProjectRel(project.getCluster(),
- project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- project.getProjects(), project.getRowType());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
deleted file mode 100644
index 56ddcf3..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.calcite.plan.RelOptRule} to generate {@link org.beam.dsls.sql.rel.BeamRelNode}.
- */
-package org.beam.dsls.sql.rule;
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
deleted file mode 100644
index 3816063..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.Schema.TableType;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
-import org.beam.dsls.sql.planner.BeamQueryPlanner;
-
-/**
- * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
- */
-public abstract class BaseBeamTable implements ScannableTable, Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = -1262988061830914193L;
- private RelDataType relDataType;
-
- protected BeamSQLRecordType beamSqlRecordType;
-
- public BaseBeamTable(RelProtoDataType protoRowType) {
- this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY);
- this.beamSqlRecordType = BeamSQLRecordType.from(relDataType);
- }
-
- /**
- * In Beam SQL, there's no difference between a batch query and a streaming
- * query. {@link BeamIOType} is used to validate the sources.
- */
- public abstract BeamIOType getSourceType();
-
- /**
- * create a {@code IO.read()} instance to read from source.
- *
- */
- public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader();
-
- /**
- * create a {@code IO.write()} instance to write to target.
- *
- */
- public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter();
-
- @Override
- public Enumerable<Object[]> scan(DataContext root) {
- // not used as Beam SQL uses its own execution engine
- return null;
- }
-
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return relDataType;
- }
-
- /**
- * Not used {@link Statistic} to optimize the plan.
- */
- @Override
- public Statistic getStatistic() {
- return Statistics.UNKNOWN;
- }
-
- /**
- * all sources are treated as TABLE in Beam SQL.
- */
- @Override
- public TableType getJdbcTableType() {
- return TableType.TABLE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
deleted file mode 100644
index 5e55b0f..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Type as a source IO, determined whether it's a STREAMING process, or batch
- * process.
- */
-public enum BeamIOType implements Serializable {
- BOUNDED, UNBOUNDED;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
deleted file mode 100644
index dc8e381..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Field type information in {@link BeamSQLRow}.
- *
- */
-//@DefaultCoder(BeamSQLRecordTypeCoder.class)
-public class BeamSQLRecordType implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = -5318734648766104712L;
- private List<String> fieldsName = new ArrayList<>();
- private List<SqlTypeName> fieldsType = new ArrayList<>();
-
- public static BeamSQLRecordType from(RelDataType tableInfo) {
- BeamSQLRecordType record = new BeamSQLRecordType();
- for (RelDataTypeField f : tableInfo.getFieldList()) {
- record.fieldsName.add(f.getName());
- record.fieldsType.add(f.getType().getSqlTypeName());
- }
- return record;
- }
-
- public int size() {
- return fieldsName.size();
- }
-
- public List<String> getFieldsName() {
- return fieldsName;
- }
-
- public void setFieldsName(List<String> fieldsName) {
- this.fieldsName = fieldsName;
- }
-
- public List<SqlTypeName> getFieldsType() {
- return fieldsType;
- }
-
- public void setFieldsType(List<SqlTypeName> fieldsType) {
- this.fieldsType = fieldsType;
- }
-
- @Override
- public String toString() {
- return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
deleted file mode 100644
index 2989cb9..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * A {@link Coder} for {@link BeamSQLRecordType}.
- *
- */
-public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
- private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
- private static final VarIntCoder intCoder = VarIntCoder.of();
-
- private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder();
- private BeamSQLRecordTypeCoder(){}
-
- public static BeamSQLRecordTypeCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(BeamSQLRecordType value, OutputStream outStream,
- org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
- Context nested = context.nested();
- intCoder.encode(value.size(), outStream, nested);
- for(String fieldName : value.getFieldsName()){
- stringCoder.encode(fieldName, outStream, nested);
- }
- for(SqlTypeName fieldType : value.getFieldsType()){
- stringCoder.encode(fieldType.name(), outStream, nested);
- }
- outStream.flush();
- }
-
- @Override
- public BeamSQLRecordType decode(InputStream inStream,
- org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
- BeamSQLRecordType typeRecord = new BeamSQLRecordType();
- Context nested = context.nested();
- int size = intCoder.decode(inStream, nested);
- for(int idx=0; idx<size; ++idx){
- typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested));
- }
- for(int idx=0; idx<size; ++idx){
- typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested)));
- }
- return typeRecord;
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void verifyDeterministic()
- throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
deleted file mode 100644
index db93168..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Repersent a generic ROW record in Beam SQL.
- *
- */
-public class BeamSQLRow implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 4569220242480160895L;
-
- private List<Integer> nullFields = new ArrayList<>();
- private List<Object> dataValues;
- private BeamSQLRecordType dataType;
-
- public BeamSQLRow(BeamSQLRecordType dataType) {
- this.dataType = dataType;
- this.dataValues = new ArrayList<>();
- for(int idx=0; idx<dataType.size(); ++idx){
- dataValues.add(null);
- }
- }
-
- public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) {
- this.dataValues = dataValues;
- this.dataType = dataType;
- }
-
- public void addField(String fieldName, Object fieldValue) {
- addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
- }
-
- public void addField(int index, Object fieldValue) {
- if(fieldValue == null){
- dataValues.set(index, fieldValue);
- if(!nullFields.contains(index)){nullFields.add(index);}
- return;
- }
-
- SqlTypeName fieldType = dataType.getFieldsType().get(index);
- switch (fieldType) {
- case INTEGER:
- case SMALLINT:
- case TINYINT:
- if(!(fieldValue instanceof Integer)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case DOUBLE:
- if(!(fieldValue instanceof Double)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case BIGINT:
- if(!(fieldValue instanceof Long)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case FLOAT:
- if(!(fieldValue instanceof Float)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case VARCHAR:
- if(!(fieldValue instanceof String)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case TIME:
- case TIMESTAMP:
- if(!(fieldValue instanceof Date)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- default:
- throw new UnsupportedDataTypeException(fieldType);
- }
- dataValues.set(index, fieldValue);
- }
-
-
- public int getInteger(int idx) {
- return (Integer) getFieldValue(idx);
- }
-
- public double getDouble(int idx) {
- return (Double) getFieldValue(idx);
- }
-
- public long getLong(int idx) {
- return (Long) getFieldValue(idx);
- }
-
- public String getString(int idx) {
- return (String) getFieldValue(idx);
- }
-
- public Date getDate(int idx) {
- return (Date) getFieldValue(idx);
- }
-
- public Object getFieldValue(String fieldName) {
- return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
- }
-
- public Object getFieldValue(int fieldIdx) {
- if(nullFields.contains(fieldIdx)){
- return null;
- }
-
- Object fieldValue = dataValues.get(fieldIdx);
- SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
-
- switch (fieldType) {
- case INTEGER:
- case SMALLINT:
- case TINYINT:
- if(!(fieldValue instanceof Integer)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }else{
- return Integer.valueOf(fieldValue.toString());
- }
- case DOUBLE:
- if(!(fieldValue instanceof Double)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }else{
- return Double.valueOf(fieldValue.toString());
- }
- case BIGINT:
- if(!(fieldValue instanceof Long)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }else{
- return Long.valueOf(fieldValue.toString());
- }
- case FLOAT:
- if(!(fieldValue instanceof Float)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }else{
- return Float.valueOf(fieldValue.toString());
- }
- case VARCHAR:
- if(!(fieldValue instanceof String)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }else{
- return fieldValue.toString();
- }
- case TIME:
- case TIMESTAMP:
- if(!(fieldValue instanceof Date)){
- throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }else{
- return fieldValue;
- }
- default:
- throw new UnsupportedDataTypeException(fieldType);
- }
- }
-
- public int size() {
- return dataValues.size();
- }
-
- public List<Object> getDataValues() {
- return dataValues;
- }
-
- public void setDataValues(List<Object> dataValues) {
- this.dataValues = dataValues;
- }
-
- public BeamSQLRecordType getDataType() {
- return dataType;
- }
-
- public void setDataType(BeamSQLRecordType dataType) {
- this.dataType = dataType;
- }
-
- public void setNullFields(List<Integer> nullFields) {
- this.nullFields = nullFields;
- }
-
- public List<Integer> getNullFields() {
- return nullFields;
- }
-
- @Override
- public String toString() {
- return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
- }
-
- /**
- * Return data fields as key=value.
- */
- public String valueInString() {
- StringBuffer sb = new StringBuffer();
- for (int idx = 0; idx < size(); ++idx) {
- sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
- }
- return sb.substring(1);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- BeamSQLRow other = (BeamSQLRow) obj;
- return toString().equals(other.toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
deleted file mode 100644
index 00af18d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Date;
-import java.util.List;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- * A {@link Coder} encodes {@link BeamSQLRow}.
- *
- */
-public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
- private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of();
-
- private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
-
- private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
- private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
- private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
- private static final DoubleCoder doubleCoder = DoubleCoder.of();
-
- private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
- private BeamSqlRowCoder(){}
-
- public static BeamSqlRowCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(BeamSQLRow value, OutputStream outStream,
- org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
- recordTypeCoder.encode(value.getDataType(), outStream, context);
- listCoder.encode(value.getNullFields(), outStream, context);
-
- Context nested = context.nested();
-
- for (int idx = 0; idx < value.size(); ++idx) {
- if(value.getNullFields().contains(idx)){
- continue;
- }
-
- switch (value.getDataType().getFieldsType().get(idx)) {
- case INTEGER:
- case SMALLINT:
- case TINYINT:
- intCoder.encode(value.getInteger(idx), outStream, nested);
- break;
- case DOUBLE:
- case FLOAT:
- doubleCoder.encode(value.getDouble(idx), outStream, nested);
- break;
- case BIGINT:
- longCoder.encode(value.getLong(idx), outStream, nested);
- break;
- case VARCHAR:
- stringCoder.encode(value.getString(idx), outStream, nested);
- break;
- case TIME:
- case TIMESTAMP:
- longCoder.encode(value.getDate(idx).getTime(), outStream, nested);
- break;
-
- default:
- throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
- }
- }
- }
-
- @Override
- public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- BeamSQLRecordType type = recordTypeCoder.decode(inStream, context);
- List<Integer> nullFields = listCoder.decode(inStream, context);
-
- BeamSQLRow record = new BeamSQLRow(type);
- record.setNullFields(nullFields);
-
- for (int idx = 0; idx < type.size(); ++idx) {
- if(nullFields.contains(idx)){
- continue;
- }
-
- switch (type.getFieldsType().get(idx)) {
- case INTEGER:
- case SMALLINT:
- case TINYINT:
- record.addField(idx, intCoder.decode(inStream, context));
- break;
- case DOUBLE:
- case FLOAT:
- record.addField(idx, doubleCoder.decode(inStream, context));
- break;
- case BIGINT:
- record.addField(idx, longCoder.decode(inStream, context));
- break;
- case VARCHAR:
- record.addField(idx, stringCoder.decode(inStream, context));
- break;
- case TIME:
- case TIMESTAMP:
- record.addField(idx, new Date(longCoder.decode(inStream, context)));
- break;
-
- default:
- throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
- }
- }
-
- return record;
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return null;
- }
-
- @Override
- public void verifyDeterministic()
- throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
deleted file mode 100644
index 6240426..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-public class InvalidFieldException extends RuntimeException {
-
- public InvalidFieldException() {
- super();
- }
-
- public InvalidFieldException(String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
deleted file mode 100644
index 9a2235e..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import org.apache.calcite.sql.type.SqlTypeName;
-
-public class UnsupportedDataTypeException extends RuntimeException {
-
- public UnsupportedDataTypeException(SqlTypeName unsupportedType){
- super(String.format("Not support data type [%s]", unsupportedType));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
deleted file mode 100644
index 2570763..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema.kafka;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Kafka topic that saves records as CSV format.
- *
- */
-public class BeamKafkaCSVTable extends BeamKafkaTable {
-
- /**
- *
- */
- private static final long serialVersionUID = 4754022536543333984L;
-
- public static final String DELIMITER = ",";
- private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class);
-
- public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
- List<String> topics) {
- super(protoRowType, bootstrapServers, topics);
- }
-
- @Override
- public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
- getPTransformForInput() {
- return new CsvRecorderDecoder(beamSqlRecordType);
- }
-
- @Override
- public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
- getPTransformForOutput() {
- return new CsvRecorderEncoder(beamSqlRecordType);
- }
-
- /**
- * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}.
- *
- */
- public static class CsvRecorderDecoder
- extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> {
- private BeamSQLRecordType recordType;
-
- public CsvRecorderDecoder(BeamSQLRecordType recordType) {
- this.recordType = recordType;
- }
-
- @Override
- public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> input) {
- return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSQLRow>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- String rowInString = new String(c.element().getValue());
- String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER);
- if (parts.length != recordType.size()) {
- LOG.error(String.format("invalid record: ", rowInString));
- } else {
- BeamSQLRow sourceRecord = new BeamSQLRow(recordType);
- for (int idx = 0; idx < parts.length; ++idx) {
- sourceRecord.addField(idx, parts[idx]);
- }
- c.output(sourceRecord);
- }
- }
- }));
- }
- }
-
- /**
- * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}.
- *
- */
- public static class CsvRecorderEncoder
- extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> {
- private BeamSQLRecordType recordType;
-
- public CsvRecorderEncoder(BeamSQLRecordType recordType) {
- this.recordType = recordType;
- }
-
- @Override
- public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> input) {
- return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, KV<byte[], byte[]>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- BeamSQLRow in = c.element();
- StringBuffer sb = new StringBuffer();
- for (int idx = 0; idx < in.size(); ++idx) {
- sb.append(DELIMITER);
- sb.append(in.getFieldValue(idx).toString());
- }
- c.output(KV.of(new byte[] {}, sb.substring(1).getBytes()));
- }
- }));
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
deleted file mode 100644
index 482383b..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema.kafka;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.BeamIOType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
- * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}.
- *
- */
-public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = -634715473399906527L;
-
- private String bootstrapServers;
- private List<String> topics;
- private Map<String, Object> configUpdates;
-
- protected BeamKafkaTable(RelProtoDataType protoRowType) {
- super(protoRowType);
- }
-
- public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers,
- List<String> topics) {
- super(protoRowType);
- this.bootstrapServers = bootstrapServers;
- this.topics = topics;
- }
-
- public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
- this.configUpdates = configUpdates;
- return this;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return BeamIOType.UNBOUNDED;
- }
-
- public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
- getPTransformForInput();
-
- public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
- getPTransformForOutput();
-
- @Override
- public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
- return new PTransform<PBegin, PCollection<BeamSQLRow>>() {
-
- @Override
- public PCollection<BeamSQLRow> expand(PBegin input) {
- return input.apply("read",
- KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics)
- .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
- .withValueCoder(ByteArrayCoder.of()).withoutMetadata())
- .apply("in_format", getPTransformForInput());
-
- }
- };
- }
-
- @Override
- public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
- checkArgument(topics != null && topics.size() == 1,
- "Only one topic can be acceptable as output.");
-
- return new PTransform<PCollection<BeamSQLRow>, PDone>() {
- @Override
- public PDone expand(PCollection<BeamSQLRow> input) {
- return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
- KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers)
- .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of())
- .withValueCoder(ByteArrayCoder.of()));
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
deleted file mode 100644
index 822fce7..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * table schema for KafkaIO.
- */
-package org.beam.dsls.sql.schema.kafka;
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
deleted file mode 100644
index ef9cc7d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * define table schema, to map with Beam IO components.
- *
- */
-package org.beam.dsls.sql.schema;
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
deleted file mode 100644
index 06db280..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.rel.BeamFilterRel;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step.
- *
- */
-public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> {
- /**
- *
- */
- private static final long serialVersionUID = -1256111753670606705L;
-
- private String stepName;
- private BeamSQLExpressionExecutor executor;
-
- public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) {
- super();
- this.stepName = stepName;
- this.executor = executor;
- }
-
- @Setup
- public void setup() {
- executor.prepare();
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- BeamSQLRow in = c.element();
-
- List<Object> result = executor.execute(in);
-
- if ((Boolean) result.get(0)) {
- c.output(in);
- }
- }
-
- @Teardown
- public void close() {
- executor.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
deleted file mode 100644
index 1014c0d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.transform;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * A test PTransform to display output in console.
- *
- */
-public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> {
- /**
- *
- */
- private static final long serialVersionUID = -1256111753670606705L;
-
- private String stepName;
-
- public BeamSQLOutputToConsoleFn(String stepName) {
- super();
- this.stepName = stepName;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- System.out.println("Output: " + c.element().getDataValues());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
deleted file mode 100644
index 12061d2..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.rel.BeamProjectRel;
-import org.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- *
- * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step.
- *
- */
-public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
-
- /**
- *
- */
- private static final long serialVersionUID = -1046605249999014608L;
- private String stepName;
- private BeamSQLExpressionExecutor executor;
- private BeamSQLRecordType outputRecordType;
-
- public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor,
- BeamSQLRecordType outputRecordType) {
- super();
- this.stepName = stepName;
- this.executor = executor;
- this.outputRecordType = outputRecordType;
- }
-
- @Setup
- public void setup() {
- executor.prepare();
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- List<Object> results = executor.execute(c.element());
-
- BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
- for (int idx = 0; idx < results.size(); ++idx) {
- outRow.addField(idx, results.get(idx));
- }
-
- c.output(outRow);
- }
-
- @Teardown
- public void close() {
- executor.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
deleted file mode 100644
index 2607abf..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL pipeline.
- */
-package org.beam.dsls.sql.transform;
http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
new file mode 100644
index 0000000..733b056
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.BeforeClass;
+
+/**
+ * prepare {@code BeamSqlRunner} for test.
+ *
+ */
+public class BasePlanner {
+ public static BeamSqlRunner runner = new BeamSqlRunner();
+
+ @BeforeClass
+ public static void prepare() {
+ runner.addTable("ORDER_DETAILS", getTable());
+ runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+ runner.addTable("SUB_ORDER_RAM", getTable());
+ }
+
+ private static BaseBeamTable getTable() {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ return new MockedBeamSQLTable(protoRowType);
+ }
+
+ public static BaseBeamTable getTable(String bootstrapServer, String topic) {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ Map<String, Object> consumerPara = new HashMap<String, Object>();
+ consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+ return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
+ .updateConsumerProperties(consumerPara);
+ }
+}