You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/10 12:51:47 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

HuangXingBo opened a new pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051
 
 
   ## What is the purpose of the change
   
   *This pull request introduce Python Physical Correlate RelNodes which are containers for Python TableFunction*
   
   ## Brief change log
   
     - *Add Physical Python Correlate Rule `BatchExecPythonCorrelateRule` ,`StreamExecPythonCorrelateRule` and `DataStreamPythonCorrelateRule`*
     - *Add Physical Python Correlate Node `DataStreamPythonCorrelate`, `StreamExecPythonCorrelate` and `BatchExecPythonCorrelate`*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added ut case testCorrelatePythonTableFunction*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r380120642
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
 
 Review comment:
   Hi @hequn8128, thanks for the notice. Sure, in such cases we can keep classes as Scala classes as a short term solution. But we should eliminate those shortcomings in the near future. E.g. by adding a refactoring commit that unblocks new code from implementing it in Java. In any case we should create a follow-up issue and find someone to address this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378823281
 
 

 ##########
 File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
 ##########
 @@ -172,6 +173,25 @@ class PojoUser() {
   }
 }
 
+class PythonTableFunction extends TableFunction[Row] with PythonFunction {
+
+  def eval(x: Int, y: Int): Unit = {
+    for (i <- 0 until y) {
+      val row = new Row(2)
+      row.setField(0, x)
+      row.setField(1, i * i)
+      collect(row)
 
 Review comment:
   How about remove these lines?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-587302161
 
 
   @flinkbot run travis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379227379
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
 
 Review comment:
   Hi @twalthr It seems we can't use the implemented methods of a Scala trait from Java([see details](https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods)) which prevents us from turning this class to a Java one. For this class, it needs to extend two traits, i.e., CommonCorrelate and DataStreamRel. Considering this, can we keep this class as a Scala one for now?
   
   I have checked that the classes of Rule has been implemented in Java in this PR. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   * f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379228560
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/datastream/DataStreamPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream;
+
+import org.apache.flink.table.plan.nodes.FlinkConventions;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamPythonCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.plan.schema.RowSchema;
+import org.apache.flink.table.plan.util.CorrelateUtil;
+import org.apache.flink.table.plan.util.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link DataStreamPythonCorrelate}.
+ */
+public class DataStreamPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new DataStreamPythonCorrelateRule();
+
+	private DataStreamPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.DATASTREAM(),
+			"DataStreamPythonCorrelateRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate join = call.rel(0);
+		RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
 
 Review comment:
   The right node may not be a python table function.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:MANUAL TriggerID:587302161
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230 TriggerType:MANUAL TriggerID:587302161
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   * f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149250281) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   * f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149250281) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378836618
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputSchema: RowSchema,
+    input: RelNode,
+    scan: FlinkLogicalTableFunctionScan,
+    condition: Option[RexNode],
+    schema: RowSchema,
+    joinSchema: RowSchema,
 
 Review comment:
   Remove this member. It has never been used.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:MANUAL TriggerID:587302161
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230 TriggerType:MANUAL TriggerID:587302161
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   * f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149250281) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-586696380
 
 
   Thanks a lot for @hequn8128 review, I have addressed the comments at the latest commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379234158
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
 ##########
 @@ -113,6 +114,25 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
   }
 }
 
+class PythonTableFunction extends TableFunction[Row] with PythonFunction {
 
 Review comment:
   How about renaming it to MockPythonTableFunction which is more clear in case of any confusion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379244079
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link StreamExecPythonCorrelate}.
+ */
+public class StreamExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new StreamExecPythonCorrelateRule();
+
+	private StreamExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.STREAM_PHYSICAL(),
+			"StreamExecPythonCorrelateRule");
+	}
+
+	// find only calc and table function
+	private boolean findTableFunction(FlinkLogicalCalc calc) {
+		RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+		if (child instanceof FlinkLogicalTableFunctionScan) {
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) child;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (child instanceof FlinkLogicalCalc) {
+			FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+			return findTableFunction(childCalc);
+		}
+		return false;
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate correlate = call.rel(0);
+		RelNode right = ((RelSubset) correlate.getRight()).getOriginal();
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (right instanceof FlinkLogicalCalc) {
+			// a filter is pushed above the table function
+			return findTableFunction((FlinkLogicalCalc) right);
+		}
+		return false;
+	}
+
+	@Override
+	public RelNode convert(RelNode relNode) {
+		StreamExecPythonCorrelateFactory factory = new StreamExecPythonCorrelateFactory(relNode);
+		return factory.convertToCorrelate();
+	}
+
+	/**
+	 * The factory is responsible to creating {@link StreamExecPythonCorrelate}.
+	 */
+	private static class StreamExecPythonCorrelateFactory {
+		private final RelNode correlateRel;
 
 Review comment:
   Remove this member.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379227379
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
 
 Review comment:
   Hi @twalthr It seems we can't use the implemented methods of a Scala trait from Java([see details](https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods)) which prevents us from turning this class to a Java one. For this class, it needs to extend two traits, i.e., CommonCorrelate and DataStreamRel. Maybe we can keep this class as a Scala one for now?
   
   I have checked that the classes of Rule has been implemented in Java in this PR. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378832786
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
 ##########
 @@ -40,10 +40,11 @@ class DataStreamCorrelateRule
     val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
     right match {
-      // right node is a table function
-      case scan: FlinkLogicalTableFunctionScan => true
+      // right node is a java table function
+      case scan: FlinkLogicalTableFunctionScan => PythonUtil.isNonPythonCall(scan.getCall)
       // a filter is pushed above the table function
-      case calc: FlinkLogicalCalc if CorrelateUtil.getTableFunctionScan(calc).isDefined => true
+      case calc: FlinkLogicalCalc =>
+        PythonUtil.isNonPythonCall(CorrelateUtil.getTableFunctionScan(calc).get.getCall)
 
 Review comment:
   The method `getTableFunctionScan` may returns None.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378830458
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
 ##########
 @@ -40,10 +40,11 @@ class DataStreamCorrelateRule
     val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
     right match {
-      // right node is a table function
-      case scan: FlinkLogicalTableFunctionScan => true
+      // right node is a java table function
 
 Review comment:
   I don't think the right node must be a java table function. The original comment is ok.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379968783
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala
 ##########
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.functions.utils.TableSqlFunction
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Base Flink RelNode which matches along with join a user defined table function.
+  */
+abstract class StreamExecCorrelateBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    val projectProgram: Option[RexProgram],
+    scan: FlinkLogicalTableFunctionScan,
+    condition: Option[RexNode],
+    outputRowType: RelDataType,
+    joinType: JoinRelType)
+  extends SingleRel(cluster, traitSet, inputRel)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = false
+
+  override def deriveRowType(): RelDataType = outputRowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    copy(traitSet, inputs.get(0), projectProgram, outputRowType)
+  }
+
+  /**
+    * Note: do not passing member 'child' because singleRel.replaceInput may update 'input' rel.
+    */
+  def copy(
+      traitSet: RelTraitSet,
+      newChild: RelNode,
+      projectProgram: Option[RexProgram],
+      outputType: RelDataType): RelNode
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val rexCall = scan.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 
 Review comment:
   The test failed due to this line. We can remove this line as it has never been used. 
   Remember to remove the useless import after removing this line.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   * f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149250281) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379968881
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelateBase.scala
 ##########
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.functions.utils.TableSqlFunction
+import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
+import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Correlate, JoinRelType}
+import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, RelFieldCollation, RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Base Batch physical RelNode for [[Correlate]] (user defined table function).
+  */
+abstract class BatchExecCorrelateBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    scan: FlinkLogicalTableFunctionScan,
+    condition: Option[RexNode],
+    projectProgram: Option[RexProgram],
+    outputRowType: RelDataType,
+    joinType: JoinRelType)
+  extends SingleRel(cluster, traitSet, inputRel)
+  with BatchPhysicalRel
+  with BatchExecNode[BaseRow] {
+
+  require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
+
+  override def deriveRowType(): RelDataType = outputRowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    copy(traitSet, inputs.get(0), projectProgram, outputRowType)
+  }
+
+  /**
+    * Note: do not passing member 'child' because singleRel.replaceInput may update 'input' rel.
+    */
+  def copy(
+      traitSet: RelTraitSet,
+      child: RelNode,
+      projectProgram: Option[RexProgram],
+      outputType: RelDataType): RelNode
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val rexCall = scan.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379227379
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
 
 Review comment:
   Hi @twalthr It seems we can't use the implemented methods of a Scala trait from Java([see details](https://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods)) which prevents us from turning this class to a Java one. For this class, it needs to extend two traits, i.e., CommonCorrelate and DataStreamRel. Maybe we can keep this class as a Scala one for now. What do you think?
   
   I have checked that the classes of Rule has been implemented in Java in this PR. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379235507
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
 ##########
 @@ -39,13 +40,13 @@ class BatchExecCorrelateRule extends ConverterRule(
     val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
     right match {
-      // right node is a table function
-      case _: FlinkLogicalTableFunctionScan => true
+      // right node is a java table function
+      case scan: FlinkLogicalTableFunctionScan => PythonUtil.isNonPythonCall(scan.getCall)
       // a filter is pushed above the table function
       case calc: FlinkLogicalCalc =>
-        calc.getInput.asInstanceOf[RelSubset]
-            .getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan]
-      case _ => false
+        val scan = calc.getInput.asInstanceOf[RelSubset]
+          .getOriginal.asInstanceOf[FlinkLogicalTableFunctionScan]
 
 Review comment:
   The original may not be a `FlinkLogicalTableFunctionScan`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379235239
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
 ##########
 @@ -39,13 +40,13 @@ class BatchExecCorrelateRule extends ConverterRule(
     val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
     right match {
-      // right node is a table function
-      case _: FlinkLogicalTableFunctionScan => true
+      // right node is a java table function
+      case scan: FlinkLogicalTableFunctionScan => PythonUtil.isNonPythonCall(scan.getCall)
       // a filter is pushed above the table function
       case calc: FlinkLogicalCalc =>
-        calc.getInput.asInstanceOf[RelSubset]
-            .getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan]
-      case _ => false
 
 Review comment:
   Please recover this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379240935
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
 ##########
 @@ -18,30 +18,19 @@
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CorrelateCodeGenerator}
 import org.apache.flink.table.planner.delegation.BatchPlanner
-import org.apache.flink.table.planner.functions.utils.TableSqlFunction
-import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
-import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 
-import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{Correlate, JoinRelType}
-import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, RelFieldCollation, RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
-
-import java.util
-
-import scala.collection.JavaConversions._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.{RexNode, RexProgram}
 
 /**
-  * Batch physical RelNode for [[Correlate]] (user defined table function).
+  * Batch physical RelNode for [[Correlate]] (Java user defined table function).
 
 Review comment:
   Java => Java/Scala ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378822971
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
 ##########
 @@ -113,6 +114,25 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
   }
 }
 
+class PythonTableFunction extends TableFunction[Row] with PythonFunction {
+
+  def eval(x: Int, y: Int): Unit = {
+    for (i <- 0 until y) {
+      val row = new Row(2)
+      row.setField(0, x)
+      row.setField(1, i * i)
+      collect(row)
+    }
 
 Review comment:
   How about remove these lines. The code will never be called.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379968783
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala
 ##########
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.functions.utils.TableSqlFunction
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Base Flink RelNode which matches along with join a user defined table function.
+  */
+abstract class StreamExecCorrelateBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    val projectProgram: Option[RexProgram],
+    scan: FlinkLogicalTableFunctionScan,
+    condition: Option[RexNode],
+    outputRowType: RelDataType,
+    joinType: JoinRelType)
+  extends SingleRel(cluster, traitSet, inputRel)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = false
+
+  override def deriveRowType(): RelDataType = outputRowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    copy(traitSet, inputs.get(0), projectProgram, outputRowType)
+  }
+
+  /**
+    * Note: do not passing member 'child' because singleRel.replaceInput may update 'input' rel.
+    */
+  def copy(
+      traitSet: RelTraitSet,
+      newChild: RelNode,
+      projectProgram: Option[RexProgram],
+      outputType: RelDataType): RelNode
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val rexCall = scan.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 
 Review comment:
   The test failed due to this line. We can remove this line as it has never been used. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379228490
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link StreamExecPythonCorrelate}.
+ */
+public class StreamExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new StreamExecPythonCorrelateRule();
+
+	private StreamExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.STREAM_PHYSICAL(),
+			"StreamExecPythonCorrelateRule");
+	}
+
+	// find only calc and table function
+	private boolean findTableFunction(FlinkLogicalCalc calc) {
+		RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+		if (child instanceof FlinkLogicalTableFunctionScan) {
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) child;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (child instanceof FlinkLogicalCalc) {
+			FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+			return findTableFunction(childCalc);
+		}
+		return false;
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate correlate = call.rel(0);
+		RelNode right = ((RelSubset) correlate.getRight()).getOriginal();
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
 
 Review comment:
   The right node may not be a python table function.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 closed pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 closed pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379234575
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
 ##########
 @@ -178,4 +177,15 @@ class CorrelateTest extends TableTestBase {
       .flatMap(func2('f3))
     util.verifyPlan(resultTable)
   }
+
+  @Test
+  def testCorrelatePythonTableFunction(): Unit = {
+    val util = streamTestUtil()
+    val sourceTable = util.addTableSource[(Int, Int, String)]("MyTable", 'a, 'b, 'c)
+    val func = new PythonTableFunction
+    util.addFunction("pyFunc", func)
 
 Review comment:
   We don't need to register the function here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584108384
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 68b071496379b090212eb961b2026578ee2c64e3 (Mon Feb 10 12:54:15 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15961).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-584123392
 
 
   <!--
   Meta data
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:68b071496379b090212eb961b2026578ee2c64e3 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148190648 TriggerType:PUSH TriggerID:68b071496379b090212eb961b2026578ee2c64e3
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:9c17e490eca0cba3ca56c8094042b7e97bb217a4 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726395 TriggerType:PUSH TriggerID:9c17e490eca0cba3ca56c8094042b7e97bb217a4
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:67f8b75918a21c604569ef75f99663c08a26fcdf Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149164808 TriggerType:PUSH TriggerID:67f8b75918a21c604569ef75f99663c08a26fcdf
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:PUSH TriggerID:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8
   Hash:f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/149250281 TriggerType:MANUAL TriggerID:587302161
   -->
   ## CI report:
   
   * 68b071496379b090212eb961b2026578ee2c64e3 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148190648) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5010) 
   * 9c17e490eca0cba3ca56c8094042b7e97bb217a4 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726395) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5121) 
   * 67f8b75918a21c604569ef75f99663c08a26fcdf Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/149164808) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5220) 
   * f0fbdc6d21c7c57c51366e53a149dc8b8db118f8 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149250281) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5230) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-586809475
 
 
   > @HuangXingBo Thanks a lot for the update. The test failed due to the problems in `BatchExecCorrelateBase` and `StreamExecCorrelateBase`. See the detailed comments below.
   > 
   > As for the last commit that adding implementations for the Python Correlate RelNode, maybe it's better to add the commit later. Because in this PR, there are no ways to write tests to cover the implementation.
   > 
   > What do you think?
   
   Thanks a lot for @hequn8128 , I will move the last commit to FLINK-15972.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379234497
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
 ##########
 @@ -117,4 +116,15 @@ class CorrelateTest extends TableTestBase {
 
     util.verifyPlan(result)
   }
+
+  @Test
+  def testCorrelatePythonTableFunction(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTableSource[(Int, Int, String)]("MyTable", 'a, 'b, 'c)
+    val func = new PythonTableFunction
+    util.addFunction("pyFunc", func)
 
 Review comment:
   We don't need to register the function here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379243229
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new BatchExecPythonCorrelateRule();
+
+	private BatchExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.BATCH_PHYSICAL(),
+			"BatchExecPythonCorrelateRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate join = call.rel(0);
+		RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (right instanceof FlinkLogicalCalc) {
+			// a filter is pushed above the table function
+			FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+			RelSubset input = (RelSubset) calc.getInput();
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) input.getOriginal();
+			return PythonUtil.isPythonCall(scan.getCall());
+		}
+		return false;
+	}
+
+	@Override
+	public RelNode convert(RelNode relNode) {
+		BatchExecPythonCorrelateFactory factory = new BatchExecPythonCorrelateFactory(relNode);
+		return factory.convertToCorrelate();
+	}
+
+	/**
+	 * The factory is responsible to creating {@link BatchExecPythonCorrelate}.
+	 */
+	private static class BatchExecPythonCorrelateFactory {
+		private final RelNode correlateRel;
+		private final FlinkLogicalCorrelate correlate;
+		private final RelTraitSet traitSet;
+		private final RelNode convInput;
+		private final RelNode right;
+
+		BatchExecPythonCorrelateFactory(RelNode rel) {
+			this.correlateRel = rel;
 
 Review comment:
   Remove this member?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379242650
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new BatchExecPythonCorrelateRule();
+
+	private BatchExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.BATCH_PHYSICAL(),
+			"BatchExecPythonCorrelateRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate join = call.rel(0);
+		RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (right instanceof FlinkLogicalCalc) {
+			// a filter is pushed above the table function
+			FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+			RelSubset input = (RelSubset) calc.getInput();
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) input.getOriginal();
 
 Review comment:
   We can't cast the original node to `FlinkLogicalTableFunctionScan`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379235017
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
 ##########
 @@ -39,13 +40,13 @@ class BatchExecCorrelateRule extends ConverterRule(
     val right = join.getRight.asInstanceOf[RelSubset].getOriginal
 
     right match {
-      // right node is a table function
-      case _: FlinkLogicalTableFunctionScan => true
+      // right node is a java table function
 
 Review comment:
   The right node may not be a java table function

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379228461
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new BatchExecPythonCorrelateRule();
+
+	private BatchExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.BATCH_PHYSICAL(),
+			"BatchExecPythonCorrelateRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate join = call.rel(0);
+		RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
 
 Review comment:
   The right node may not be a python table function.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379244169
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link StreamExecPythonCorrelate}.
+ */
+public class StreamExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new StreamExecPythonCorrelateRule();
+
+	private StreamExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.STREAM_PHYSICAL(),
+			"StreamExecPythonCorrelateRule");
+	}
+
+	// find only calc and table function
+	private boolean findTableFunction(FlinkLogicalCalc calc) {
+		RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+		if (child instanceof FlinkLogicalTableFunctionScan) {
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) child;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (child instanceof FlinkLogicalCalc) {
+			FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+			return findTableFunction(childCalc);
+		}
+		return false;
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate correlate = call.rel(0);
+		RelNode right = ((RelSubset) correlate.getRight()).getOriginal();
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (right instanceof FlinkLogicalCalc) {
+			// a filter is pushed above the table function
+			return findTableFunction((FlinkLogicalCalc) right);
+		}
+		return false;
+	}
+
+	@Override
+	public RelNode convert(RelNode relNode) {
+		StreamExecPythonCorrelateFactory factory = new StreamExecPythonCorrelateFactory(relNode);
+		return factory.convertToCorrelate();
+	}
+
+	/**
+	 * The factory is responsible to creating {@link StreamExecPythonCorrelate}.
 
 Review comment:
   responsible to => responsible for

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r378836753
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelateBase.scala
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * Base RelNode for data stream correlate.
+  */
+abstract class DataStreamCorrelateBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputSchema: RowSchema,
+    input: RelNode,
+    scan: FlinkLogicalTableFunctionScan,
+    condition: Option[RexNode],
+    schema: RowSchema,
+    joinSchema: RowSchema,
+    joinType: JoinRelType,
+    ruleDescription: String)
 
 Review comment:
   Remove this member. It has never been used.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on issue #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#issuecomment-587375958
 
 
   @HuangXingBo Hi, the jira id in the commit log is wrong. Would be great if you can check on this next time.
   Merging...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379241683
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
 ##########
 @@ -21,59 +21,37 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CorrelateCodeGenerator}
 import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.functions.utils.TableSqlFunction
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
-
-import java.util
-
-import scala.collection.JavaConversions._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.{RexNode, RexProgram}
 
 /**
-  * Flink RelNode which matches along with join a user defined table function.
+  * Flink RelNode which matches along with join a java user defined table function.
 
 Review comment:
   java => Java/Scala

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379230207
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new BatchExecPythonCorrelateRule();
+
+	private BatchExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.BATCH_PHYSICAL(),
+			"BatchExecPythonCorrelateRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate join = call.rel(0);
+		RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (right instanceof FlinkLogicalCalc) {
+			// a filter is pushed above the table function
+			FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+			RelSubset input = (RelSubset) calc.getInput();
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) input.getOriginal();
+			return PythonUtil.isPythonCall(scan.getCall());
+		}
+		return false;
+	}
+
+	@Override
+	public RelNode convert(RelNode relNode) {
+		BatchExecPythonCorrelateFactory factory = new BatchExecPythonCorrelateFactory(relNode);
+		return factory.convertToCorrelate();
+	}
+
+	/**
+	 * The factory is responsible to creating {@link BatchExecPythonCorrelate}.
+	 */
+	private static class BatchExecPythonCorrelateFactory {
+		private final RelNode correlateRel;
 
 Review comment:
   We can remove this member.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11051: [FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
URL: https://github.com/apache/flink/pull/11051#discussion_r379229634
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import scala.Option;
+import scala.Some;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
+ * {@link BatchExecPythonCorrelate}.
+ */
+public class BatchExecPythonCorrelateRule extends ConverterRule {
+
+	public static final RelOptRule INSTANCE = new BatchExecPythonCorrelateRule();
+
+	private BatchExecPythonCorrelateRule() {
+		super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.BATCH_PHYSICAL(),
+			"BatchExecPythonCorrelateRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalCorrelate join = call.rel(0);
+		RelNode right = ((RelSubset) join.getRight()).getOriginal();
+
+		if (right instanceof FlinkLogicalTableFunctionScan) {
+			// right node is a python table function
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right;
+			return PythonUtil.isPythonCall(scan.getCall());
+		} else if (right instanceof FlinkLogicalCalc) {
+			// a filter is pushed above the table function
+			FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
+			RelSubset input = (RelSubset) calc.getInput();
+			FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) input.getOriginal();
+			return PythonUtil.isPythonCall(scan.getCall());
+		}
+		return false;
+	}
+
+	@Override
+	public RelNode convert(RelNode relNode) {
+		BatchExecPythonCorrelateFactory factory = new BatchExecPythonCorrelateFactory(relNode);
+		return factory.convertToCorrelate();
+	}
+
+	/**
+	 * The factory is responsible to creating {@link BatchExecPythonCorrelate}.
 
 Review comment:
   responsible for

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services