You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:31 UTC
[08/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
new file mode 100644
index 0000000..cb6a523
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'OVERLAY' operator.
+ *
+ * <p>
+ * OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
+ * </p>
+ */
+public class BeamSqlOverlayExpression extends BeamSqlExpression {
+ public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() < 3 || operands.size() > 4) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+ || !SqlTypeName.CHAR_TYPES.contains(opType(1))
+ || !SqlTypeName.INT_TYPES.contains(opType(2))) {
+ return false;
+ }
+
+ if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ String replaceStr = opValueEvaluated(1, inputRow);
+ int idx = opValueEvaluated(2, inputRow);
+ // the index is 1 based.
+ idx -= 1;
+ int length = replaceStr.length();
+ if (operands.size() == 4) {
+ length = opValueEvaluated(3, inputRow);
+ }
+
+ StringBuilder result = new StringBuilder(
+ str.length() + replaceStr.length() - length);
+ result.append(str.substring(0, idx))
+ .append(replaceStr)
+ .append(str.substring(idx + length));
+
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
new file mode 100644
index 0000000..144acbf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * String position operator.
+ *
+ * <p>
+ * example:
+ * POSITION(string1 IN string2)
+ * POSITION(string1 IN string2 FROM integer)
+ * </p>
+ */
+public class BeamSqlPositionExpression extends BeamSqlExpression {
+ public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.INTEGER);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() < 2 || operands.size() > 3) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+ || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
+ return false;
+ }
+
+ if (operands.size() == 3
+ && !SqlTypeName.INT_TYPES.contains(opType(2))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String targetStr = opValueEvaluated(0, inputRow);
+ String containingStr = opValueEvaluated(1, inputRow);
+ int from = -1;
+ if (operands.size() == 3) {
+ Number tmp = opValueEvaluated(2, inputRow);
+ from = tmp.intValue();
+ }
+
+ int idx = containingStr.indexOf(targetStr, from);
+
+ return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
new file mode 100644
index 0000000..d931db9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all string unary operators.
+ */
+public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
+ public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() != 1) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
+ return false;
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
new file mode 100644
index 0000000..8b33125
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'SUBSTRING' operator.
+ *
+ * <p>
+ * SUBSTRING(string FROM integer)
+ * SUBSTRING(string FROM integer FOR integer)
+ * </p>
+ */
+public class BeamSqlSubstringExpression extends BeamSqlExpression {
+ public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() < 2 || operands.size() > 3) {
+ return false;
+ }
+
+ if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
+ || !SqlTypeName.INT_TYPES.contains(opType(1))) {
+ return false;
+ }
+
+ if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ int idx = opValueEvaluated(1, inputRow);
+ int startIdx = idx;
+ if (startIdx > 0) {
+ // NOTE: SQL substring is 1 based(rather than 0 based)
+ startIdx -= 1;
+ } else if (startIdx < 0) {
+ // NOTE: SQL also support negative index...
+ startIdx += str.length();
+ } else {
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
+ }
+
+ if (operands.size() == 3) {
+ int length = opValueEvaluated(2, inputRow);
+ if (length < 0) {
+ length = 0;
+ }
+ int endIdx = Math.min(startIdx + length, str.length());
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx));
+ } else {
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
new file mode 100644
index 0000000..5e6c2bb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Trim operator.
+ *
+ * <p>
+ * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
+ * </p>
+ */
+public class BeamSqlTrimExpression extends BeamSqlExpression {
+ public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public boolean accept() {
+ if (operands.size() != 1 && operands.size() != 3) {
+ return false;
+ }
+
+ if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
+ return false;
+ }
+
+ if (operands.size() == 3
+ && (
+ SqlTypeName.SYMBOL != opType(0)
+ || !SqlTypeName.CHAR_TYPES.contains(opType(1))
+ || !SqlTypeName.CHAR_TYPES.contains(opType(2)))
+ ) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ if (operands.size() == 1) {
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+ opValueEvaluated(0, inputRow).toString().trim());
+ } else {
+ SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
+ String targetStr = opValueEvaluated(1, inputRow);
+ String containingStr = opValueEvaluated(2, inputRow);
+
+ switch (type) {
+ case LEADING:
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr));
+ case TRAILING:
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr));
+ case BOTH:
+ default:
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
+ trailingTrim(leadingTrim(containingStr, targetStr), targetStr));
+ }
+ }
+ }
+
+ static String leadingTrim(String containingStr, String targetStr) {
+ int idx = 0;
+ while (containingStr.startsWith(targetStr, idx)) {
+ idx += targetStr.length();
+ }
+
+ return containingStr.substring(idx);
+ }
+
+ static String trailingTrim(String containingStr, String targetStr) {
+ int idx = containingStr.length() - targetStr.length();
+ while (containingStr.startsWith(targetStr, idx)) {
+ idx -= targetStr.length();
+ }
+
+ idx += targetStr.length();
+ return containingStr.substring(0, idx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
new file mode 100644
index 0000000..efa9c95
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.string;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * 'UPPER' operator.
+ */
+public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
+ public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
+ super(operands, SqlTypeName.VARCHAR);
+ }
+
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+ String str = opValueEvaluated(0, inputRow);
+ return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
new file mode 100644
index 0000000..f2c63f3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * String operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.string;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java
new file mode 100644
index 0000000..178d35f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * interpreter generate runnable 'code' to execute SQL relational expressions.
+ */
+package org.apache.beam.dsls.sql.interpreter;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java
new file mode 100644
index 0000000..b26e8c4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BeamSQL provides a new interface to run a SQL statement with Beam.
+ */
+package org.apache.beam.dsls.sql;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
new file mode 100644
index 0000000..93f9a2f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The core component to handle through a SQL statement, from explain execution plan,
+ * to generate a Beam pipeline.
+ *
+ */
+public class BeamQueryPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
+
+ protected final Planner planner;
+ private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
+
+ public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ public BeamQueryPlanner(SchemaPlus schema) {
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false,
+ Collections.<String>emptyList(), TYPE_FACTORY));
+
+ FrameworkConfig config = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+ .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+ .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .build();
+ this.planner = Frameworks.getPlanner(config);
+
+ for (String t : schema.getTableNames()) {
+ sourceTables.put(t, (BaseBeamTable) schema.getTable(t));
+ }
+ }
+
+ /**
+ * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
+ */
+ public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
+ return planner.parse(sqlQuery);
+ }
+
+ /**
+ * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
+ * which is linked with the given {@code pipeline}. The final output stream is returned as
+ * {@code PCollection} so more operations can be applied.
+ */
+ public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
+ , BeamSqlEnv sqlEnv) throws Exception {
+ BeamRelNode relNode = convertToBeamRel(sqlStatement);
+
+ // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
+ return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
+ }
+
+ /**
+ * It parses and validate the input query, then convert into a
+ * {@link BeamRelNode} tree.
+ *
+ */
+ public BeamRelNode convertToBeamRel(String sqlStatement)
+ throws ValidationException, RelConversionException, SqlParseException {
+ BeamRelNode beamRelNode;
+ try {
+ beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
+ } finally {
+ planner.close();
+ }
+ return beamRelNode;
+ }
+
+ private RelNode validateAndConvert(SqlNode sqlNode)
+ throws ValidationException, RelConversionException {
+ SqlNode validated = validateNode(sqlNode);
+ LOG.info("SQL:\n" + validated);
+ RelNode relNode = convertToRelNode(validated);
+ return convertToBeamRel(relNode);
+ }
+
+ private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException {
+ RelTraitSet traitSet = relNode.getTraitSet();
+
+ LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode));
+
+ // PlannerImpl.transform() optimizes RelNode with ruleset
+ return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode);
+ }
+
+ private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
+ return planner.rel(sqlNode).rel;
+ }
+
+ private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
+ return planner.validate(sqlNode);
+ }
+
+ public Map<String, BaseBeamTable> getSourceTables() {
+ return sourceTables;
+ }
+
+ public Planner getPlanner() {
+ return planner;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
new file mode 100644
index 0000000..c89a740
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+/**
+ * customized data type in Beam.
+ *
+ */
+public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl {
+ public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem();
+
+ @Override
+ public int getMaxNumericScale() {
+ return 38;
+ }
+
+ @Override
+ public int getMaxNumericPrecision() {
+ return 38;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
new file mode 100644
index 0000000..552ff8f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.Iterator;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
+import org.apache.beam.dsls.sql.rule.BeamFilterRule;
+import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
+import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
+import org.apache.beam.dsls.sql.rule.BeamIntersectRule;
+import org.apache.beam.dsls.sql.rule.BeamJoinRule;
+import org.apache.beam.dsls.sql.rule.BeamMinusRule;
+import org.apache.beam.dsls.sql.rule.BeamProjectRule;
+import org.apache.beam.dsls.sql.rule.BeamSortRule;
+import org.apache.beam.dsls.sql.rule.BeamUnionRule;
+import org.apache.beam.dsls.sql.rule.BeamValuesRule;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.tools.RuleSet;
+
+/**
+ * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
+ * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
+ *
+ */
+public class BeamRuleSets {
+ private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
+ .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
+ BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
+ BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
+ BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE,
+ BeamJoinRule.INSTANCE)
+ .build();
+
+ public static RuleSet[] getRuleSets() {
+ return new RuleSet[] { new BeamRuleSet(
+ ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) };
+ }
+
+ private static class BeamRuleSet implements RuleSet {
+ final ImmutableSet<RelOptRule> rules;
+
+ public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
+ this.rules = rules;
+ }
+
+ public BeamRuleSet(ImmutableList<RelOptRule> rules) {
+ this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
+ }
+
+ @Override
+ public Iterator<RelOptRule> iterator() {
+ return rules.iterator();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java
new file mode 100644
index 0000000..0506c5b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/planner/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface.
+ * It defines data sources, validate a SQL statement, and convert it as a Beam
+ * pipeline.
+ */
+package org.apache.beam.dsls.sql.planner;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
new file mode 100644
index 0000000..9dcb079
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.joda.time.Duration;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Aggregate} node.
+ *
+ */
+public class BeamAggregationRel extends Aggregate implements BeamRelNode {
+ private int windowFieldIdx = -1;
+ private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
+ private Trigger trigger;
+ private Duration allowedLatence = Duration.ZERO;
+
+ public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
+ , RelNode child, boolean indicator,
+ ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls
+ , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ this.windowFn = windowFn;
+ this.trigger = trigger;
+ this.windowFieldIdx = windowFieldIdx;
+ this.allowedLatence = allowedLatence;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ RelNode input = getInput();
+ String stageName = BeamSqlRelUtils.getStageName(this) + "_";
+
+ PCollection<BeamSqlRow> upstream =
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+ if (windowFieldIdx != -1) {
+ upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
+ .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
+ .setCoder(upstream.getCoder());
+ }
+
+ PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
+ Window.into(windowFn)
+ .triggering(trigger)
+ .withAllowedLateness(allowedLatence)
+ .accumulatingFiredPanes());
+
+ BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
+ stageName + "exCombineBy",
+ WithKeys
+ .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
+ windowFieldIdx, groupSet)))
+ .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
+
+
+ BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
+
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
+ stageName + "combineBy",
+ Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
+ new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
+ CalciteUtils.toBeamRowType(input.getRowType()))))
+ .setCoder(KvCoder.of(keyCoder, aggCoder));
+
+ PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
+ ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
+ CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
+ mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+ return mergedStream;
+ }
+
+ /**
+ * Type of sub-rowrecord used as Group-By keys.
+ */
+ private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
+ BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
+ for (int i : groupSet.asList()) {
+ if (i != windowFieldIdx) {
+ fieldNames.add(inputRowType.getFieldsName().get(i));
+ fieldTypes.add(inputRowType.getFieldsType().get(i));
+ }
+ }
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
+ }
+
+ /**
+ * Type of sub-rowrecord, that represents the list of aggregation fields.
+ */
+ private BeamSqlRowType exAggFieldsSchema() {
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
+ for (AggregateCall ac : getAggCallList()) {
+ fieldNames.add(ac.name);
+ fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
+ }
+
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
+ }
+
+ @Override
+ public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
+ , ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ return new BeamAggregationRel(getCluster(), traitSet, input, indicator
+ , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence);
+ }
+
+ public void setWindowFn(WindowFn windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ public void setTrigger(Trigger trigger) {
+ this.trigger = trigger;
+ }
+
+ public RelWriter explainTerms(RelWriter pw) {
+ // We skip the "groups" element if it is a singleton of "group".
+ pw.item("group", groupSet)
+ .itemIf("window", windowFn, windowFn != null)
+ .itemIf("trigger", trigger, trigger != null)
+ .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
+ .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
+ .itemIf("indicator", indicator, indicator)
+ .itemIf("aggs", aggCalls, pw.nest());
+ if (!pw.nest()) {
+ for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
+ pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
+ }
+ }
+ return pw;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
new file mode 100644
index 0000000..f802104
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * BeamRelNode to replace a {@code Filter} node.
+ *
+ */
+public class BeamFilterRel extends Filter implements BeamRelNode {
+
+ public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+ RexNode condition) {
+ super(cluster, traits, child, condition);
+ }
+
+ @Override
+ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new BeamFilterRel(getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ RelNode input = getInput();
+ String stageName = BeamSqlRelUtils.getStageName(this);
+
+ PCollection<BeamSqlRow> upstream =
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+
+ BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+
+ PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
+ ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
+ filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+ return filterStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
new file mode 100644
index 0000000..d70f94a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * BeamRelNode to replace a {@code TableModify} node.
+ *
+ */
+public class BeamIOSinkRel extends TableModify implements BeamRelNode {
+ public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
+ Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
+ List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
+ getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+ }
+
+ /**
+ * Note that {@code BeamIOSinkRel} returns the input PCollection,
+ * which is the persisted PCollection.
+ */
+ @Override
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ RelNode input = getInput();
+ String stageName = BeamSqlRelUtils.getStageName(this);
+
+ PCollection<BeamSqlRow> upstream =
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+ BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
+
+ upstream.apply(stageName, targetTable.buildIOWriter());
+
+ return upstream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
new file mode 100644
index 0000000..6754991
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import com.google.common.base.Joiner;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+
+/**
+ * BeamRelNode to replace a {@code TableScan} node.
+ *
+ */
+public class BeamIOSourceRel extends TableScan implements BeamRelNode {
+
+ public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+ super(cluster, traitSet, table);
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+ TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
+ if (inputPCollections.has(sourceTupleTag)) {
+ //choose PCollection from input PCollectionTuple if exists there.
+ PCollection<BeamSqlRow> sourceStream = inputPCollections
+ .get(new TupleTag<BeamSqlRow>(sourceName));
+ return sourceStream;
+ } else {
+ //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
+ BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
+ return sourceTable.buildIOReader(inputPCollections.getPipeline())
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
new file mode 100644
index 0000000..7cab171
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Intersect} node.
+ *
+ * <p>This is used to combine two SELECT statements, but returns rows only from the
+ * first SELECT statement that are identical to a row in the second SELECT statement.
+ */
+public class BeamIntersectRel extends Intersect implements BeamRelNode {
+ private BeamSetOperatorRelBase delegate;
+ public BeamIntersectRel(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ delegate = new BeamSetOperatorRelBase(this,
+ BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+ }
+
+ @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
new file mode 100644
index 0000000..3ebf152
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.transform.BeamJoinTransforms;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Join} node.
+ *
+ * <p>Support for join can be categorized into 3 cases:
+ * <ul>
+ * <li>BoundedTable JOIN BoundedTable</li>
+ * <li>UnboundedTable JOIN UnboundedTable</li>
+ * <li>BoundedTable JOIN UnboundedTable</li>
+ * </ul>
+ *
+ * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both
+ * sides match.
+ *
+ * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some
+ * constraints:
+ *
+ * <ul>
+ * <li>{@code FULL OUTER JOIN} is not supported.</li>
+ * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li>
+ * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li>
+ * </ul>
+ *
+ *
+ * <p>There are also some general constraints:
+ *
+ * <ul>
+ * <li>Only equi-join is supported.</li>
+ * <li>CROSS JOIN is not supported.</li>
+ * </ul>
+ */
+public class BeamJoinRel extends Join implements BeamRelNode {
+ public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+ super(cluster, traits, left, right, condition, variablesSet, joinType);
+ }
+
+ @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
+ RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+ return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
+ joinType);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
+ BeamSqlEnv sqlEnv)
+ throws Exception {
+ BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+ BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+ PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+
+ final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+ PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+
+ String stageName = BeamSqlRelUtils.getStageName(this);
+ WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
+ WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
+
+ // extract the join fields
+ List<Pair<Integer, Integer>> pairs = extractJoinColumns(
+ leftRelNode.getRowType().getFieldCount());
+
+ // build the extract key type
+ // the name of the join field is not important
+ List<String> names = new ArrayList<>(pairs.size());
+ List<Integer> types = new ArrayList<>(pairs.size());
+ for (int i = 0; i < pairs.size(); i++) {
+ names.add("c" + i);
+ types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
+ }
+ BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
+
+ Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
+
+ // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
+ .apply(stageName + "_left_ExtractJoinFields",
+ MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
+ .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
+
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
+ .apply(stageName + "_right_ExtractJoinFields",
+ MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
+ .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
+
+ // prepare the NullRows
+ BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
+ BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
+
+ // a regular join
+ if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+ || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
+ try {
+ leftWinFn.verifyCompatibility(rightWinFn);
+ } catch (IncompatibleWindowException e) {
+ throw new IllegalArgumentException(
+ "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+ }
+
+ return standardJoin(extractedLeftRows, extractedRightRows,
+ leftNullRow, rightNullRow, stageName);
+ } else if (
+ (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
+ || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+ ) {
+ // if one of the sides is Bounded & the other is Unbounded
+ // then do a sideInput join
+ // when doing a sideInput join, the windowFn does not need to match
+ // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
+ // the unbounded
+ if (joinType == JoinRelType.FULL) {
+ throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join "
+ + "a bounded table with an unbounded table.");
+ }
+
+ if ((joinType == JoinRelType.LEFT
+ && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
+ || (joinType == JoinRelType.RIGHT
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+ throw new UnsupportedOperationException(
+ "LEFT side of an OUTER JOIN must be Unbounded table.");
+ }
+
+ return sideInputJoin(extractedLeftRows, extractedRightRows,
+ leftNullRow, rightNullRow);
+ } else {
+ throw new UnsupportedOperationException(
+ "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
+ }
+ }
+
+ private PCollection<BeamSqlRow> standardJoin(
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+ BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
+ PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
+ switch (joinType) {
+ case LEFT:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow);
+ break;
+ case RIGHT:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow);
+ break;
+ case FULL:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
+ rightNullRow);
+ break;
+ case INNER:
+ default:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .innerJoin(extractedLeftRows, extractedRightRows);
+ break;
+ }
+
+ PCollection<BeamSqlRow> ret = joinedRows
+ .apply(stageName + "_JoinParts2WholeRow",
+ MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+ return ret;
+ }
+
+ public PCollection<BeamSqlRow> sideInputJoin(
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+ BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
+ // we always make the Unbounded table on the left to do the sideInput join
+ // (will convert the result accordingly before return)
+ boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
+ JoinRelType realJoinType =
+ (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
+
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
+ swapped ? extractedRightRows : extractedLeftRows;
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
+ swapped ? extractedLeftRows : extractedRightRows;
+ BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
+
+ // swapped still need to pass down because, we need to swap the result back.
+ return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
+ realRightNullRow, swapped);
+ }
+
+ private PCollection<BeamSqlRow> sideInputJoinHelper(
+ JoinRelType joinType,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
+ BeamSqlRow rightNullRow, boolean swapped) {
+ final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
+ .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
+
+ PCollection<BeamSqlRow> ret = leftRows
+ .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
+ joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+ return ret;
+ }
+
+ private BeamSqlRow buildNullRow(BeamRelNode relNode) {
+ BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
+ BeamSqlRow nullRow = new BeamSqlRow(leftType);
+ for (int i = 0; i < leftType.size(); i++) {
+ nullRow.addField(i, null);
+ }
+ return nullRow;
+ }
+
+ private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
+ // it's a CROSS JOIN because: condition == true
+ if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
+ throw new UnsupportedOperationException("CROSS JOIN is not supported!");
+ }
+
+ RexCall call = (RexCall) condition;
+ List<Pair<Integer, Integer>> pairs = new ArrayList<>();
+ if ("AND".equals(call.getOperator().getName())) {
+ List<RexNode> operands = call.getOperands();
+ for (RexNode rexNode : operands) {
+ Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
+ pairs.add(pair);
+ }
+ } else if ("=".equals(call.getOperator().getName())) {
+ pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
+ } else {
+ throw new UnsupportedOperationException(
+ "Operator " + call.getOperator().getName() + " is not supported in join condition");
+ }
+
+ return pairs;
+ }
+
+ private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
+ int leftRowColumnCount) {
+ List<RexNode> operands = oneCondition.getOperands();
+ final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+
+ final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+ final int rightIndex = rightIndex1 - leftRowColumnCount;
+
+ return new Pair<>(leftIndex, rightIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java
new file mode 100644
index 0000000..704a374
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamLogicalConvention.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+/**
+ * Convertion for Beam SQL.
+ *
+ */
+public enum BeamLogicalConvention implements Convention {
+ INSTANCE;
+
+ @Override
+ public Class getInterface() {
+ return BeamRelNode.class;
+ }
+
+ @Override
+ public String getName() {
+ return "BEAM_LOGICAL";
+ }
+
+ @Override
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ return this == trait;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return false;
+ }
+
+ @Override
+ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
new file mode 100644
index 0000000..b558f4b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Minus} node.
+ *
+ * <p>Corresponds to the SQL {@code EXCEPT} operator.
+ */
+public class BeamMinusRel extends Minus implements BeamRelNode {
+
+ private BeamSetOperatorRelBase delegate;
+
+ public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ delegate = new BeamSetOperatorRelBase(this,
+ BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
+ }
+
+ @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new BeamMinusRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
new file mode 100644
index 0000000..8f8e5ce
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * BeamRelNode to replace a {@code Project} node.
+ *
+ */
+public class BeamProjectRel extends Project implements BeamRelNode {
+
+ /**
+ * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
+ *
+ */
+ public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
+ RelDataType rowType) {
+ return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ RelNode input = getInput();
+ String stageName = BeamSqlRelUtils.getStageName(this);
+
+ PCollection<BeamSqlRow> upstream =
+ BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
+
+ BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+
+ PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
+ .of(new BeamSqlProjectFn(getRelTypeName(), executor,
+ CalciteUtils.toBeamRowType(rowType))));
+ projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
+
+ return projectStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
new file mode 100644
index 0000000..d4c98a3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
+ */
+public interface BeamRelNode extends RelNode {
+
+ /**
+ * A {@link BeamRelNode} is a recursive structure, the
+ * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
+ * algorithm.
+ */
+ PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+ throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
new file mode 100644
index 0000000..939c9c8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
+ * and {@code BeamMinusRel}.
+ */
+public class BeamSetOperatorRelBase {
+ /**
+ * Set operator type.
+ */
+ public enum OpType implements Serializable {
+ UNION,
+ INTERSECT,
+ MINUS
+ }
+
+ private BeamRelNode beamRelNode;
+ private List<RelNode> inputs;
+ private boolean all;
+ private OpType opType;
+
+ public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
+ List<RelNode> inputs, boolean all) {
+ this.beamRelNode = beamRelNode;
+ this.opType = opType;
+ this.inputs = inputs;
+ this.all = all;
+ }
+
+ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+ , BeamSqlEnv sqlEnv) throws Exception {
+ PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
+ .buildBeamPipeline(inputPCollections, sqlEnv);
+ PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
+ .buildBeamPipeline(inputPCollections, sqlEnv);
+
+ WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
+ WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
+ if (!leftWindow.isCompatible(rightWindow)) {
+ throw new IllegalArgumentException(
+ "inputs of " + opType + " have different window strategy: "
+ + leftWindow + " VS " + rightWindow);
+ }
+
+ final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
+ final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
+
+ // co-group
+ String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
+ PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
+ .of(leftTag, leftRows.apply(
+ stageName + "_CreateLeftIndex", MapElements.via(
+ new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+ .and(rightTag, rightRows.apply(
+ stageName + "_CreateRightIndex", MapElements.via(
+ new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+ .apply(CoGroupByKey.<BeamSqlRow>create());
+ PCollection<BeamSqlRow> ret = coGbkResultCollection
+ .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
+ opType, all)));
+ return ret;
+ }
+}