You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:02 UTC
[39/59] beam git commit: rename package org.apache.beam.dsls.sql to
org.apache.beam.sdk.extensions.sql
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
deleted file mode 100644
index 144acbf..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.interpreter.operator.string;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String position operator.
- *
- * <p>
- * example:
- * POSITION(string1 IN string2)
- * POSITION(string1 IN string2 FROM integer)
- * </p>
- */
-public class BeamSqlPositionExpression extends BeamSqlExpression {
- public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.INTEGER);
- }
-
- @Override public boolean accept() {
- if (operands.size() < 2 || operands.size() > 3) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
- return false;
- }
-
- if (operands.size() == 3
- && !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String targetStr = opValueEvaluated(0, inputRow);
- String containingStr = opValueEvaluated(1, inputRow);
- int from = -1;
- if (operands.size() == 3) {
- Number tmp = opValueEvaluated(2, inputRow);
- from = tmp.intValue();
- }
-
- int idx = containingStr.indexOf(targetStr, from);
-
- return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
deleted file mode 100644
index d931db9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.interpreter.operator.string;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Base class for all string unary operators.
- */
-public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
- public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
- super(operands, outputType);
- }
-
- @Override public boolean accept() {
- if (operands.size() != 1) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
- return false;
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
deleted file mode 100644
index 8b33125..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.interpreter.operator.string;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'SUBSTRING' operator.
- *
- * <p>
- * SUBSTRING(string FROM integer)
- * SUBSTRING(string FROM integer FOR integer)
- * </p>
- */
-public class BeamSqlSubstringExpression extends BeamSqlExpression {
- public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public boolean accept() {
- if (operands.size() < 2 || operands.size() > 3) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
- || !SqlTypeName.INT_TYPES.contains(opType(1))) {
- return false;
- }
-
- if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
- int idx = opValueEvaluated(1, inputRow);
- int startIdx = idx;
- if (startIdx > 0) {
- // NOTE: SQL substring is 1 based(rather than 0 based)
- startIdx -= 1;
- } else if (startIdx < 0) {
- // NOTE: SQL also support negative index...
- startIdx += str.length();
- } else {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
- }
-
- if (operands.size() == 3) {
- int length = opValueEvaluated(2, inputRow);
- if (length < 0) {
- length = 0;
- }
- int endIdx = Math.min(startIdx + length, str.length());
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx));
- } else {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
deleted file mode 100644
index 5e6c2bb..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Trim operator.
- *
- * <p>
- * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
- * </p>
- */
-public class BeamSqlTrimExpression extends BeamSqlExpression {
- public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public boolean accept() {
- if (operands.size() != 1 && operands.size() != 3) {
- return false;
- }
-
- if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
- return false;
- }
-
- if (operands.size() == 3
- && (
- SqlTypeName.SYMBOL != opType(0)
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))
- || !SqlTypeName.CHAR_TYPES.contains(opType(2)))
- ) {
- return false;
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- if (operands.size() == 1) {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
- opValueEvaluated(0, inputRow).toString().trim());
- } else {
- SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
- String targetStr = opValueEvaluated(1, inputRow);
- String containingStr = opValueEvaluated(2, inputRow);
-
- switch (type) {
- case LEADING:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr));
- case TRAILING:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr));
- case BOTH:
- default:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
- trailingTrim(leadingTrim(containingStr, targetStr), targetStr));
- }
- }
- }
-
- static String leadingTrim(String containingStr, String targetStr) {
- int idx = 0;
- while (containingStr.startsWith(targetStr, idx)) {
- idx += targetStr.length();
- }
-
- return containingStr.substring(idx);
- }
-
- static String trailingTrim(String containingStr, String targetStr) {
- int idx = containingStr.length() - targetStr.length();
- while (containingStr.startsWith(targetStr, idx)) {
- idx -= targetStr.length();
- }
-
- idx += targetStr.length();
- return containingStr.substring(0, idx);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
deleted file mode 100644
index efa9c95..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.interpreter.operator.string;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'UPPER' operator.
- */
-public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
deleted file mode 100644
index f2c63f3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * String operators.
- */
-package org.apache.beam.dsls.sql.interpreter.operator.string;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java
deleted file mode 100644
index 178d35f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * interpreter generate runnable 'code' to execute SQL relational expressions.
- */
-package org.apache.beam.dsls.sql.interpreter;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java
deleted file mode 100644
index b26e8c4..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * BeamSQL provides a new interface to run a SQL statement with Beam.
- */
-package org.apache.beam.dsls.sql;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
deleted file mode 100644
index 93f9a2f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The core component to handle through a SQL statement, from explain execution plan,
- * to generate a Beam pipeline.
- *
- */
-public class BeamQueryPlanner {
- private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
-
- protected final Planner planner;
- private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
-
- public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
-
- public BeamQueryPlanner(SchemaPlus schema) {
- final List<RelTraitDef> traitDefs = new ArrayList<>();
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false,
- Collections.<String>emptyList(), TYPE_FACTORY));
-
- FrameworkConfig config = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
- .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
- .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
- .build();
- this.planner = Frameworks.getPlanner(config);
-
- for (String t : schema.getTableNames()) {
- sourceTables.put(t, (BaseBeamTable) schema.getTable(t));
- }
- }
-
- /**
- * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
- */
- public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
- return planner.parse(sqlQuery);
- }
-
- /**
- * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
- * which is linked with the given {@code pipeline}. The final output stream is returned as
- * {@code PCollection} so more operations can be applied.
- */
- public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
- , BeamSqlEnv sqlEnv) throws Exception {
- BeamRelNode relNode = convertToBeamRel(sqlStatement);
-
- // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
- return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
- }
-
- /**
- * It parses and validate the input query, then convert into a
- * {@link BeamRelNode} tree.
- *
- */
- public BeamRelNode convertToBeamRel(String sqlStatement)
- throws ValidationException, RelConversionException, SqlParseException {
- BeamRelNode beamRelNode;
- try {
- beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
- } finally {
- planner.close();
- }
- return beamRelNode;
- }
-
- private RelNode validateAndConvert(SqlNode sqlNode)
- throws ValidationException, RelConversionException {
- SqlNode validated = validateNode(sqlNode);
- LOG.info("SQL:\n" + validated);
- RelNode relNode = convertToRelNode(validated);
- return convertToBeamRel(relNode);
- }
-
- private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException {
- RelTraitSet traitSet = relNode.getTraitSet();
-
- LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode));
-
- // PlannerImpl.transform() optimizes RelNode with ruleset
- return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode);
- }
-
- private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
- return planner.rel(sqlNode).rel;
- }
-
- private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
- return planner.validate(sqlNode);
- }
-
- public Map<String, BaseBeamTable> getSourceTables() {
- return sourceTables;
- }
-
- public Planner getPlanner() {
- return planner;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
deleted file mode 100644
index c89a740..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
-
-/**
- * customized data type in Beam.
- *
- */
-public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl {
- public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem();
-
- @Override
- public int getMaxNumericScale() {
- return 38;
- }
-
- @Override
- public int getMaxNumericPrecision() {
- return 38;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
deleted file mode 100644
index 552ff8f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Iterator;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
-import org.apache.beam.dsls.sql.rule.BeamFilterRule;
-import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
-import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
-import org.apache.beam.dsls.sql.rule.BeamIntersectRule;
-import org.apache.beam.dsls.sql.rule.BeamJoinRule;
-import org.apache.beam.dsls.sql.rule.BeamMinusRule;
-import org.apache.beam.dsls.sql.rule.BeamProjectRule;
-import org.apache.beam.dsls.sql.rule.BeamSortRule;
-import org.apache.beam.dsls.sql.rule.BeamUnionRule;
-import org.apache.beam.dsls.sql.rule.BeamValuesRule;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.tools.RuleSet;
-
-/**
- * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
- * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
- *
- */
-public class BeamRuleSets {
- private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
- .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
- BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
- BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
- BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE,
- BeamJoinRule.INSTANCE)
- .build();
-
- public static RuleSet[] getRuleSets() {
- return new RuleSet[] { new BeamRuleSet(
- ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) };
- }
-
- private static class BeamRuleSet implements RuleSet {
- final ImmutableSet<RelOptRule> rules;
-
- public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
- this.rules = rules;
- }
-
- public BeamRuleSet(ImmutableList<RelOptRule> rules) {
- this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
- }
-
- @Override
- public Iterator<RelOptRule> iterator() {
- return rules.iterator();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java
deleted file mode 100644
index 0506c5b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface.
- * It defines data sources, validate a SQL statement, and convert it as a Beam
- * pipeline.
- */
-package org.apache.beam.dsls.sql.planner;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
deleted file mode 100644
index 9dcb079..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.WithTimestamps;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Util;
-import org.joda.time.Duration;
-
-/**
- * {@link BeamRelNode} to replace a {@link Aggregate} node.
- *
- */
-public class BeamAggregationRel extends Aggregate implements BeamRelNode {
- private int windowFieldIdx = -1;
- private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
- private Trigger trigger;
- private Duration allowedLatence = Duration.ZERO;
-
- public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
- , RelNode child, boolean indicator,
- ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls
- , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
- super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
- this.windowFn = windowFn;
- this.trigger = trigger;
- this.windowFieldIdx = windowFieldIdx;
- this.allowedLatence = allowedLatence;
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this) + "_";
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
- if (windowFieldIdx != -1) {
- upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
- .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
- .setCoder(upstream.getCoder());
- }
-
- PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
- Window.into(windowFn)
- .triggering(trigger)
- .withAllowedLateness(allowedLatence)
- .accumulatingFiredPanes());
-
- BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
- PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
- stageName + "exCombineBy",
- WithKeys
- .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
- windowFieldIdx, groupSet)))
- .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
-
-
- BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
-
- PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
- stageName + "combineBy",
- Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
- new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
- CalciteUtils.toBeamRowType(input.getRowType()))))
- .setCoder(KvCoder.of(keyCoder, aggCoder));
-
- PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
- ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
- CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
- mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return mergedStream;
- }
-
- /**
- * Type of sub-rowrecord used as Group-By keys.
- */
- private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
- BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
- List<String> fieldNames = new ArrayList<>();
- List<Integer> fieldTypes = new ArrayList<>();
- for (int i : groupSet.asList()) {
- if (i != windowFieldIdx) {
- fieldNames.add(inputRowType.getFieldsName().get(i));
- fieldTypes.add(inputRowType.getFieldsType().get(i));
- }
- }
- return BeamSqlRowType.create(fieldNames, fieldTypes);
- }
-
- /**
- * Type of sub-rowrecord, that represents the list of aggregation fields.
- */
- private BeamSqlRowType exAggFieldsSchema() {
- List<String> fieldNames = new ArrayList<>();
- List<Integer> fieldTypes = new ArrayList<>();
- for (AggregateCall ac : getAggCallList()) {
- fieldNames.add(ac.name);
- fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
- }
-
- return BeamSqlRowType.create(fieldNames, fieldTypes);
- }
-
- @Override
- public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
- , ImmutableBitSet groupSet,
- List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new BeamAggregationRel(getCluster(), traitSet, input, indicator
- , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence);
- }
-
- public void setWindowFn(WindowFn windowFn) {
- this.windowFn = windowFn;
- }
-
- public void setTrigger(Trigger trigger) {
- this.trigger = trigger;
- }
-
- public RelWriter explainTerms(RelWriter pw) {
- // We skip the "groups" element if it is a singleton of "group".
- pw.item("group", groupSet)
- .itemIf("window", windowFn, windowFn != null)
- .itemIf("trigger", trigger, trigger != null)
- .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
- .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
- .itemIf("indicator", indicator, indicator)
- .itemIf("aggs", aggCalls, pw.nest());
- if (!pw.nest()) {
- for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
- pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
- }
- }
- return pw;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
deleted file mode 100644
index f802104..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code Filter} node.
- *
- */
-public class BeamFilterRel extends Filter implements BeamRelNode {
-
- public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
- RexNode condition) {
- super(cluster, traits, child, condition);
- }
-
- @Override
- public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
- return new BeamFilterRel(getCluster(), traitSet, input, condition);
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
- BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
-
- PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
- ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
- filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return filterStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
deleted file mode 100644
index d70f94a..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import com.google.common.base.Joiner;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code TableModify} node.
- *
- */
-public class BeamIOSinkRel extends TableModify implements BeamRelNode {
- public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
- Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
- List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
- super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
- sourceExpressionList, flattened);
- }
-
- @Override
- public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
- getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
- }
-
- /**
- * Note that {@code BeamIOSinkRel} returns the input PCollection,
- * which is the persisted PCollection.
- */
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
-
- upstream.apply(stageName, targetTable.buildIOWriter());
-
- return upstream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
deleted file mode 100644
index 6754991..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import com.google.common.base.Joiner;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-
-/**
- * BeamRelNode to replace a {@code TableScan} node.
- *
- */
-public class BeamIOSourceRel extends TableScan implements BeamRelNode {
-
- public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
- super(cluster, traitSet, table);
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
- if (inputPCollections.has(sourceTupleTag)) {
- //choose PCollection from input PCollectionTuple if exists there.
- PCollection<BeamSqlRow> sourceStream = inputPCollections
- .get(new TupleTag<BeamSqlRow>(sourceName));
- return sourceStream;
- } else {
- //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
- BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
- return sourceTable.buildIOReader(inputPCollections.getPipeline())
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
deleted file mode 100644
index 7cab171..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Intersect;
-import org.apache.calcite.rel.core.SetOp;
-
-/**
- * {@code BeamRelNode} to replace a {@code Intersect} node.
- *
- * <p>This is used to combine two SELECT statements, but returns rows only from the
- * first SELECT statement that are identical to a row in the second SELECT statement.
- */
-public class BeamIntersectRel extends Intersect implements BeamRelNode {
- private BeamSetOperatorRelBase delegate;
- public BeamIntersectRel(
- RelOptCluster cluster,
- RelTraitSet traits,
- List<RelNode> inputs,
- boolean all) {
- super(cluster, traits, inputs, all);
- delegate = new BeamSetOperatorRelBase(this,
- BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
- }
-
- @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
deleted file mode 100644
index 3ebf152..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.transform.BeamJoinTransforms;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.Pair;
-
-/**
- * {@code BeamRelNode} to replace a {@code Join} node.
- *
- * <p>Support for join can be categorized into 3 cases:
- * <ul>
- * <li>BoundedTable JOIN BoundedTable</li>
- * <li>UnboundedTable JOIN UnboundedTable</li>
- * <li>BoundedTable JOIN UnboundedTable</li>
- * </ul>
- *
- * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both
- * sides match.
- *
- * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some
- * constraints:
- *
- * <ul>
- * <li>{@code FULL OUTER JOIN} is not supported.</li>
- * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li>
- * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li>
- * </ul>
- *
- *
- * <p>There are also some general constraints:
- *
- * <ul>
- * <li>Only equi-join is supported.</li>
- * <li>CROSS JOIN is not supported.</li>
- * </ul>
- */
-public class BeamJoinRel extends Join implements BeamRelNode {
- public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
- RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
- super(cluster, traits, left, right, condition, variablesSet, joinType);
- }
-
- @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
- RelNode right, JoinRelType joinType, boolean semiJoinDone) {
- return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
- joinType);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
- BeamSqlEnv sqlEnv)
- throws Exception {
- BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
- PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
- final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
- String stageName = BeamSqlRelUtils.getStageName(this);
- WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
- WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
-
- // extract the join fields
- List<Pair<Integer, Integer>> pairs = extractJoinColumns(
- leftRelNode.getRowType().getFieldCount());
-
- // build the extract key type
- // the name of the join field is not important
- List<String> names = new ArrayList<>(pairs.size());
- List<Integer> types = new ArrayList<>(pairs.size());
- for (int i = 0; i < pairs.size(); i++) {
- names.add("c" + i);
- types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
- }
- BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
-
- Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
-
- // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
- .apply(stageName + "_left_ExtractJoinFields",
- MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
- .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
-
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
- .apply(stageName + "_right_ExtractJoinFields",
- MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
- .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
-
- // prepare the NullRows
- BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
- BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
-
- // a regular join
- if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
- || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
- try {
- leftWinFn.verifyCompatibility(rightWinFn);
- } catch (IncompatibleWindowException e) {
- throw new IllegalArgumentException(
- "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
- }
-
- return standardJoin(extractedLeftRows, extractedRightRows,
- leftNullRow, rightNullRow, stageName);
- } else if (
- (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
- || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
- ) {
- // if one of the sides is Bounded & the other is Unbounded
- // then do a sideInput join
- // when doing a sideInput join, the windowFn does not need to match
- // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
- // the unbounded
- if (joinType == JoinRelType.FULL) {
- throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join "
- + "a bounded table with an unbounded table.");
- }
-
- if ((joinType == JoinRelType.LEFT
- && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
- || (joinType == JoinRelType.RIGHT
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
- throw new UnsupportedOperationException(
- "LEFT side of an OUTER JOIN must be Unbounded table.");
- }
-
- return sideInputJoin(extractedLeftRows, extractedRightRows,
- leftNullRow, rightNullRow);
- } else {
- throw new UnsupportedOperationException(
- "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
- }
- }
-
- private PCollection<BeamSqlRow> standardJoin(
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
- BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
- PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
- switch (joinType) {
- case LEFT:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow);
- break;
- case RIGHT:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow);
- break;
- case FULL:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
- rightNullRow);
- break;
- case INNER:
- default:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .innerJoin(extractedLeftRows, extractedRightRows);
- break;
- }
-
- PCollection<BeamSqlRow> ret = joinedRows
- .apply(stageName + "_JoinParts2WholeRow",
- MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
- return ret;
- }
-
- public PCollection<BeamSqlRow> sideInputJoin(
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
- BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
- // we always make the Unbounded table on the left to do the sideInput join
- // (will convert the result accordingly before return)
- boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
- JoinRelType realJoinType =
- (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
-
- PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
- swapped ? extractedRightRows : extractedLeftRows;
- PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
- swapped ? extractedLeftRows : extractedRightRows;
- BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
-
- // swapped still need to pass down because, we need to swap the result back.
- return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
- realRightNullRow, swapped);
- }
-
- private PCollection<BeamSqlRow> sideInputJoinHelper(
- JoinRelType joinType,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
- BeamSqlRow rightNullRow, boolean swapped) {
- final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
- .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
-
- PCollection<BeamSqlRow> ret = leftRows
- .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
- joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return ret;
- }
-
- private BeamSqlRow buildNullRow(BeamRelNode relNode) {
- BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
- BeamSqlRow nullRow = new BeamSqlRow(leftType);
- for (int i = 0; i < leftType.size(); i++) {
- nullRow.addField(i, null);
- }
- return nullRow;
- }
-
- private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
- // it's a CROSS JOIN because: condition == true
- if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
- throw new UnsupportedOperationException("CROSS JOIN is not supported!");
- }
-
- RexCall call = (RexCall) condition;
- List<Pair<Integer, Integer>> pairs = new ArrayList<>();
- if ("AND".equals(call.getOperator().getName())) {
- List<RexNode> operands = call.getOperands();
- for (RexNode rexNode : operands) {
- Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
- pairs.add(pair);
- }
- } else if ("=".equals(call.getOperator().getName())) {
- pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
- } else {
- throw new UnsupportedOperationException(
- "Operator " + call.getOperator().getName() + " is not supported in join condition");
- }
-
- return pairs;
- }
-
- private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
- int leftRowColumnCount) {
- List<RexNode> operands = oneCondition.getOperands();
- final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
- ((RexInputRef) operands.get(1)).getIndex());
-
- final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
- ((RexInputRef) operands.get(1)).getIndex());
- final int rightIndex = rightIndex1 - leftRowColumnCount;
-
- return new Pair<>(leftIndex, rightIndex);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java
deleted file mode 100644
index 704a374..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-/**
- * Convertion for Beam SQL.
- *
- */
-public enum BeamLogicalConvention implements Convention {
- INSTANCE;
-
- @Override
- public Class getInterface() {
- return BeamRelNode.class;
- }
-
- @Override
- public String getName() {
- return "BEAM_LOGICAL";
- }
-
- @Override
- public RelTraitDef getTraitDef() {
- return ConventionTraitDef.INSTANCE;
- }
-
- @Override
- public boolean satisfies(RelTrait trait) {
- return this == trait;
- }
-
- @Override
- public void register(RelOptPlanner planner) {
- }
-
- @Override
- public String toString() {
- return getName();
- }
-
- @Override
- public boolean canConvertConvention(Convention toConvention) {
- return false;
- }
-
- @Override
- public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
deleted file mode 100644
index b558f4b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Minus;
-import org.apache.calcite.rel.core.SetOp;
-
-/**
- * {@code BeamRelNode} to replace a {@code Minus} node.
- *
- * <p>Corresponds to the SQL {@code EXCEPT} operator.
- */
-public class BeamMinusRel extends Minus implements BeamRelNode {
-
- private BeamSetOperatorRelBase delegate;
-
- public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
- boolean all) {
- super(cluster, traits, inputs, all);
- delegate = new BeamSetOperatorRelBase(this,
- BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
- }
-
- @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new BeamMinusRel(getCluster(), traitSet, inputs, all);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
deleted file mode 100644
index 8f8e5ce..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code Project} node.
- *
- */
-public class BeamProjectRel extends Project implements BeamRelNode {
-
- /**
- * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
- *
- */
- public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- List<? extends RexNode> projects, RelDataType rowType) {
- super(cluster, traits, input, projects, rowType);
- }
-
- @Override
- public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
- RelDataType rowType) {
- return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
- BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
-
- PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
- .of(new BeamSqlProjectFn(getRelTypeName(), executor,
- CalciteUtils.toBeamRowType(rowType))));
- projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return projectStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
deleted file mode 100644
index d4c98a3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
- */
-public interface BeamRelNode extends RelNode {
-
- /**
- * A {@link BeamRelNode} is a recursive structure, the
- * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
- * algorithm.
- */
- PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
- throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
deleted file mode 100644
index 939c9c8..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
- * and {@code BeamMinusRel}.
- */
-public class BeamSetOperatorRelBase {
- /**
- * Set operator type.
- */
- public enum OpType implements Serializable {
- UNION,
- INTERSECT,
- MINUS
- }
-
- private BeamRelNode beamRelNode;
- private List<RelNode> inputs;
- private boolean all;
- private OpType opType;
-
- public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
- List<RelNode> inputs, boolean all) {
- this.beamRelNode = beamRelNode;
- this.opType = opType;
- this.inputs = inputs;
- this.all = all;
- }
-
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
- .buildBeamPipeline(inputPCollections, sqlEnv);
- PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
- .buildBeamPipeline(inputPCollections, sqlEnv);
-
- WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
- WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
- if (!leftWindow.isCompatible(rightWindow)) {
- throw new IllegalArgumentException(
- "inputs of " + opType + " have different window strategy: "
- + leftWindow + " VS " + rightWindow);
- }
-
- final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
- final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
-
- // co-group
- String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
- PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
- .of(leftTag, leftRows.apply(
- stageName + "_CreateLeftIndex", MapElements.via(
- new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
- .and(rightTag, rightRows.apply(
- stageName + "_CreateRightIndex", MapElements.via(
- new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
- .apply(CoGroupByKey.<BeamSqlRow>create());
- PCollection<BeamSqlRow> ret = coGbkResultCollection
- .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
- opType, all)));
- return ret;
- }
-}