You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/19 07:19:05 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

HuangXingBo opened a new pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130
 
 
   ## What is the purpose of the change
   
   *This pull request adds the Python building blocks to make sure the basic functionality of Python TableFunction could work.*
   
   
   ## Brief change log
   
     - *Implement Physical Python Correlate Node*
     - *Add TableFunctionRowCoder*
     - *Adds basic building block in udf.py, operations.py for Python UDTF execution*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added integration tests test_udtf.py*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384226509
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
 
 Review comment:
   private ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150763583",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b03780cc0826c6e5a131b9b7f0e042d58d9364a8 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150641573) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619) 
   * c79d7a9aa04b43cc0258535d2f676ee52bc9534d Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150763583) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298213
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
+      config: Configuration,
+      inputRowType: BaseRowTypeInfo,
+      outputRowType: BaseRowTypeInfo,
+      pythonFunctionInfo: PythonFunctionInfo,
+      udtfInputOffsets: Array[Int],
+      joinType: JoinRelType): OneInputStreamOperator[BaseRow, BaseRow] = {
+    val clazz = loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Configuration],
+      classOf[PythonFunctionInfo],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[JoinRelType])
+    ctor.newInstance(
+      config,
+      pythonFunctionInfo,
+      inputRowType.toRowType,
+      outputRowType.toRowType,
+      udtfInputOffsets,
+      joinType)
+      .asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]]
+  }
+
+  private[flink] def extractPythonTableFunctionInfo(
 
 Review comment:
   protected?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   Hash:a34a306f023189434d990edd1a85eecfa324842a Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/149583826 TriggerType:PUSH TriggerID:a34a306f023189434d990edd1a85eecfa324842a
   Hash:a34a306f023189434d990edd1a85eecfa324842a Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298 TriggerType:PUSH TriggerID:a34a306f023189434d990edd1a85eecfa324842a
   -->
   ## CI report:
   
   * a34a306f023189434d990edd1a85eecfa324842a Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149583826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383885581
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/operations.py
 ##########
 @@ -121,19 +122,19 @@ def get(self, value):
         return self._constant_value
 
 
-class ScalarFunctionInvoker(object):
+class StatelessFunctionInvoker(object):
 
 Review comment:
   Also rename `ScalarFunctionInputGetter` to `StatelessFunctionInputGetter`. BTW, also rename related comments.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298074
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
 
 Review comment:
   protected?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383874038
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/operations.py
 ##########
 @@ -211,59 +212,94 @@ def setup(self, main_receivers):
             per_element_output_counter=None)
 
     def open(self):
-        for invoker in self.scalar_function_invokers:
+        for invoker in self.user_defined_function_invokers:
             invoker.invoke_open()
 
     def close(self):
-        for invoker in self.scalar_function_invokers:
+        for invoker in self.user_defined_function_invokers:
             invoker.invoke_close()
 
+
+class ScalarFunctionRunner(StatelessFunctionRunner):
+    """
+    The runner which is responsible for executing the scalar functions and send the
+    execution results back to the remote Java operator.
+
+    :param udfs_proto: protocol representation for the scalar functions to execute
+    """
+
+    def __init__(self, udfs_proto):
+        super(ScalarFunctionRunner, self).__init__(udfs_proto)
+
     def process(self, windowed_value):
         results = [invoker.invoke_eval(windowed_value.value) for invoker in
-                   self.scalar_function_invokers]
+                   self.user_defined_function_invokers]
         # send the execution results back
         self.output_processor.process_outputs(windowed_value, [results])
 
 
-class ScalarFunctionOperation(Operation):
+class TableFunctionRunner(StatelessFunctionRunner):
     """
-    An operation that will execute ScalarFunctions for each input element.
+    The runner which is responsible for executing the table functions and send the
+    execution results back to the remote Java operator.
+
+    :param udfs_proto: protocol representation for the table functions to execute
+    """
+
+    def __init__(self, udfs_proto):
 
 Review comment:
   udfs_proto => udtf_proto?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 closed pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 closed pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a34a306f023189434d990edd1a85eecfa324842a Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149583826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-591736003
 
 
   @HuangXingBo Thanks a lot for the update. The test failed due to some checkstyle problem. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150763583",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150785561",
       "triggerID" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5669",
       "triggerID" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8dffbd51da45135595bb8ba1b0c48f24d7ccef0a UNKNOWN
   * ddcd843e92059b7bc14bcddb6aa1e8580d00d481 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150785561) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5669) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150763583",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c79d7a9aa04b43cc0258535d2f676ee52bc9534d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150763583) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-591738612
 
 
   Thanks a a lot for @hequn8128 , I have addressed the codestyle problem at the latest commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384225394
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCalc.scala
 ##########
 @@ -36,70 +36,20 @@ import org.apache.flink.table.types.logical.RowType
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
-trait CommonPythonCalc {
-
-  def loadClass(className: String): Class[_] = {
-    try {
-      Class.forName(className, false, Thread.currentThread.getContextClassLoader)
-    } catch {
-      case ex: ClassNotFoundException => throw new TableException(
-        "The dependency of 'flink-python' is not present on the classpath.", ex)
-    }
-  }
-
-  private lazy val convertLiteralToPython = {
-    val clazz = loadClass("org.apache.flink.api.common.python.PythonBridgeUtils")
-    clazz.getMethod("convertLiteralToPython", classOf[RexLiteral], classOf[SqlTypeName])
-  }
+trait CommonPythonCalc extends CommonPythonBase{
 
 Review comment:
   Add a blank after `CommonPythonBase` 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b03780cc0826c6e5a131b9b7f0e042d58d9364a8 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150641573) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619) 
   * c79d7a9aa04b43cc0258535d2f676ee52bc9534d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384238861
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/coders.py
 ##########
 @@ -38,6 +39,38 @@
            'TimestampCoder', 'ArrayCoder', 'MapCoder', 'DecimalCoder']
 
 
+class TableFunctionRowCoder(FastCoder):
+    """
+    Coder for Table Function Row.
+    """
+    def __init__(self, row_coder):
 
 Review comment:
   How about rename row_coder to flatten_row_coder? It is more clear to distinguish flatten_row_coder from row_coder

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383892660
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/operations.py
 ##########
 @@ -148,54 +149,54 @@ def __init__(self, scalar_function, inputs):
 
     def invoke_open(self):
         """
-        Invokes the ScalarFunction.open() function.
+        Invokes the StatelessFunction.open() function.
         """
         for input_getter in self.input_getters:
             input_getter.open()
         # set the FunctionContext to None for now
-        self.scalar_function.open(None)
+        self.user_defined_function.open(None)
 
     def invoke_close(self):
         """
-        Invokes the ScalarFunction.close() function.
+        Invokes the StatelessFunction.close() function.
         """
         for input_getter in self.input_getters:
             input_getter.close()
-        self.scalar_function.close()
+        self.user_defined_function.close()
 
     def invoke_eval(self, value):
         """
-        Invokes the ScalarFunction.eval() function.
+        Invokes the StatelessFunction.eval() function.
 
         :param value: the input element for which eval() method should be invoked
         """
         args = [input_getter.get(value) for input_getter in self.input_getters]
-        return self.scalar_function.eval(*args)
+        return self.user_defined_function.eval(*args)
 
 
-def create_scalar_function_invoker(scalar_function_proto):
+def create_stateless_function_invoker(user_defined_function_proto):
     """
-    Creates :class:`ScalarFunctionInvoker` from the proto representation of a
-    :class:`ScalarFunction`.
+    Creates :class:`StatelessFunctionInvoker` from the proto representation of a
+    :class:`ScalarFunction` or :class:`TableFunction`.
 
-    :param scalar_function_proto: the proto representation of the Python :class:`ScalarFunction`
-    :return: :class:`ScalarFunctionInvoker`.
+    :param user_defined_function_proto: the proto representation of the Python
+    :class:`ScalarFunction` or :class:`TableFunction`.
+    :return: :class:`TableFunctionInvoker`.
     """
-    scalar_function = cloudpickle.loads(scalar_function_proto.payload)
-    return ScalarFunctionInvoker(scalar_function, scalar_function_proto.inputs)
+    table_function = cloudpickle.loads(user_defined_function_proto.payload)
 
 Review comment:
   table_function => user_defined_function. Please check all these errors in this class.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150763583",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c79d7a9aa04b43cc0258535d2f676ee52bc9534d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150763583) 
   * 8dffbd51da45135595bb8ba1b0c48f24d7ccef0a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   Hash:a34a306f023189434d990edd1a85eecfa324842a Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:a34a306f023189434d990edd1a85eecfa324842a
   -->
   ## CI report:
   
   * a34a306f023189434d990edd1a85eecfa324842a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298213
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
+      config: Configuration,
+      inputRowType: BaseRowTypeInfo,
+      outputRowType: BaseRowTypeInfo,
+      pythonFunctionInfo: PythonFunctionInfo,
+      udtfInputOffsets: Array[Int],
+      joinType: JoinRelType): OneInputStreamOperator[BaseRow, BaseRow] = {
+    val clazz = loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Configuration],
+      classOf[PythonFunctionInfo],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[JoinRelType])
+    ctor.newInstance(
+      config,
+      pythonFunctionInfo,
+      inputRowType.toRowType,
+      outputRowType.toRowType,
+      udtfInputOffsets,
+      joinType)
+      .asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]]
+  }
+
+  private[flink] def extractPythonTableFunctionInfo(
 
 Review comment:
   protected?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384239756
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/coder_impl.py
 ##########
 @@ -93,6 +93,30 @@ def __repr__(self):
         return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders)
 
 
+class TableFunctionRowCoderImpl(StreamCoderImpl):
+
+    def __init__(self, row_coder):
+        self._row_coder = row_coder
+        self._field_count = row_coder._filed_count
+
+    def encode_to_stream(self, value, out_stream, nested):
+        if value is None:
+            self.write_finish_message(out_stream)
 
 Review comment:
   Avoid extra function call here(to avoid performance regression)? i.e.,  do `out_stream.write_byte(0x00)` directly here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150763583",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c79d7a9aa04b43cc0258535d2f676ee52bc9534d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150763583) 
   * 8dffbd51da45135595bb8ba1b0c48f24d7ccef0a UNKNOWN
   * ddcd843e92059b7bc14bcddb6aa1e8580d00d481 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   Hash:a34a306f023189434d990edd1a85eecfa324842a Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/149583826 TriggerType:PUSH TriggerID:a34a306f023189434d990edd1a85eecfa324842a
   Hash:a34a306f023189434d990edd1a85eecfa324842a Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298 TriggerType:PUSH TriggerID:a34a306f023189434d990edd1a85eecfa324842a
   -->
   ## CI report:
   
   * a34a306f023189434d990edd1a85eecfa324842a Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149583826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298074
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
 
 Review comment:
   protected?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150763583",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150785561",
       "triggerID" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5669",
       "triggerID" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8dffbd51da45135595bb8ba1b0c48f24d7ccef0a UNKNOWN
   * ddcd843e92059b7bc14bcddb6aa1e8580d00d481 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150785561) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5669) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383903241
 
 

 ##########
 File path: flink-python/pyflink/table/table_environment.py
 ##########
 @@ -756,8 +757,12 @@ def register_function(self, name, function):
         :param function: The python user-defined function to register.
         :type function: pyflink.table.udf.UserDefinedFunctionWrapper
         """
-        self._j_tenv.registerFunction(name, function._judf(self._is_blink_planner,
-                                                           self.get_config()._j_table_config))
+        if isinstance(function, UserDefinedScalarFunctionWrapper):
 
 Review comment:
   We can add a base method in `UserDefinedScalarFunctionWrapper` and call the method here. For example, we can rename the current `_judf` to `_j_user_defined_function`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384227948
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.plan.nodes.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
+      config: Configuration,
+      inputRowType: RowType,
+      outputRowType: RowType,
+      pythonFunctionInfo: PythonFunctionInfo,
+      udtfInputOffsets: Array[Int],
+      joinType: JoinRelType): OneInputStreamOperator[CRow, CRow] = {
+    val clazz = loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Configuration],
+      classOf[PythonFunctionInfo],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[JoinRelType])
+    ctor.newInstance(
+      config,
+      pythonFunctionInfo,
+      inputRowType,
+      outputRowType,
+      udtfInputOffsets,
+      joinType)
+      .asInstanceOf[OneInputStreamOperator[CRow, CRow]]
+  }
+
+  private[flink] def extractPythonTableFunctionInfo(
 
 Review comment:
   protected

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a34a306f023189434d990edd1a85eecfa324842a Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149583826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298) 
   * b03780cc0826c6e5a131b9b7f0e042d58d9364a8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298384
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.plan.nodes.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
 
 Review comment:
   private?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384226774
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
+      config: Configuration,
+      inputRowType: BaseRowTypeInfo,
+      outputRowType: BaseRowTypeInfo,
+      pythonFunctionInfo: PythonFunctionInfo,
+      udtfInputOffsets: Array[Int],
+      joinType: JoinRelType): OneInputStreamOperator[BaseRow, BaseRow] = {
+    val clazz = loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Configuration],
+      classOf[PythonFunctionInfo],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[JoinRelType])
+    ctor.newInstance(
+      config,
+      pythonFunctionInfo,
+      inputRowType.toRowType,
+      outputRowType.toRowType,
+      udtfInputOffsets,
+      joinType)
+      .asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]]
+  }
+
+  private[flink] def extractPythonTableFunctionInfo(
+      pythonRexCall: RexCall): (Array[Int], PythonFunctionInfo) = {
+    val inputNodes = new mutable.LinkedHashMap[RexNode, Integer]()
+    val pythonTableFunctionInfo = createPythonFunctionInfo(pythonRexCall, inputNodes)
+    val udtfInputOffsets = inputNodes.toArray
+      .map(_._1)
+      .collect { case inputRef: RexInputRef => inputRef.getIndex }
+    (udtfInputOffsets, pythonTableFunctionInfo)
+  }
+
+  def createPythonOneInputTransformation(
 
 Review comment:
   protected

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a34a306f023189434d990edd1a85eecfa324842a Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/149583826) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298) 
   * b03780cc0826c6e5a131b9b7f0e042d58d9364a8 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150641573) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588072699
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit a34a306f023189434d990edd1a85eecfa324842a (Wed Feb 19 07:21:47 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15972).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-591387056
 
 
   Thanks a lot for @hequn8128 review, I have addressed the comments at the latest commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383877613
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/operations.py
 ##########
 @@ -211,59 +212,94 @@ def setup(self, main_receivers):
             per_element_output_counter=None)
 
     def open(self):
-        for invoker in self.scalar_function_invokers:
+        for invoker in self.user_defined_function_invokers:
             invoker.invoke_open()
 
     def close(self):
-        for invoker in self.scalar_function_invokers:
+        for invoker in self.user_defined_function_invokers:
             invoker.invoke_close()
 
+
+class ScalarFunctionRunner(StatelessFunctionRunner):
+    """
+    The runner which is responsible for executing the scalar functions and send the
+    execution results back to the remote Java operator.
+
+    :param udfs_proto: protocol representation for the scalar functions to execute
+    """
+
+    def __init__(self, udfs_proto):
+        super(ScalarFunctionRunner, self).__init__(udfs_proto)
+
     def process(self, windowed_value):
         results = [invoker.invoke_eval(windowed_value.value) for invoker in
-                   self.scalar_function_invokers]
+                   self.user_defined_function_invokers]
         # send the execution results back
         self.output_processor.process_outputs(windowed_value, [results])
 
 
-class ScalarFunctionOperation(Operation):
+class TableFunctionRunner(StatelessFunctionRunner):
     """
-    An operation that will execute ScalarFunctions for each input element.
+    The runner which is responsible for executing the table functions and send the
+    execution results back to the remote Java operator.
+
+    :param udfs_proto: protocol representation for the table functions to execute
+    """
+
+    def __init__(self, udfs_proto):
+        super(TableFunctionRunner, self).__init__(udfs_proto)
+
+    def create_result(self, value):
+        result = self.user_defined_function_invokers[0].invoke_eval(value)
+        if result is not None:
+            yield from result
+        yield None
+
+    def process(self, windowed_value):
+        results = self.create_result(windowed_value.value)
+        self.output_processor.process_outputs(windowed_value, results)
+
+
+class StatelessFunctionOperation(Operation):
+    """
+    Base class of stateless function operation that will execute TableFunction for each input
 
 Review comment:
   will execute TableFunction => will execute TableFunction or ScalarFunction

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298406
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.plan.nodes.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
+      config: Configuration,
+      inputRowType: RowType,
+      outputRowType: RowType,
+      pythonFunctionInfo: PythonFunctionInfo,
+      udtfInputOffsets: Array[Int],
+      joinType: JoinRelType): OneInputStreamOperator[CRow, CRow] = {
+    val clazz = loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Configuration],
+      classOf[PythonFunctionInfo],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[JoinRelType])
+    ctor.newInstance(
+      config,
+      pythonFunctionInfo,
+      inputRowType,
+      outputRowType,
+      udtfInputOffsets,
+      joinType)
+      .asInstanceOf[OneInputStreamOperator[CRow, CRow]]
+  }
+
+  private[flink] def extractPythonTableFunctionInfo(
 
 Review comment:
   private?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b03780cc0826c6e5a131b9b7f0e042d58d9364a8 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150641573) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384226616
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
+      config: Configuration,
+      inputRowType: BaseRowTypeInfo,
+      outputRowType: BaseRowTypeInfo,
+      pythonFunctionInfo: PythonFunctionInfo,
+      udtfInputOffsets: Array[Int],
+      joinType: JoinRelType): OneInputStreamOperator[BaseRow, BaseRow] = {
+    val clazz = loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Configuration],
+      classOf[PythonFunctionInfo],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[JoinRelType])
+    ctor.newInstance(
+      config,
+      pythonFunctionInfo,
+      inputRowType.toRowType,
+      outputRowType.toRowType,
+      udtfInputOffsets,
+      joinType)
+      .asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]]
+  }
+
+  private[flink] def extractPythonTableFunctionInfo(
 
 Review comment:
   private

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b03780cc0826c6e5a131b9b7f0e042d58d9364a8 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150641573) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298384
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.plan.nodes.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
 
 Review comment:
   private?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588072699
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit ddcd843e92059b7bc14bcddb6aa1e8580d00d481 (Fri Feb 28 21:49:00 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383878868
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/operations.py
 ##########
 @@ -276,11 +312,18 @@ def progress_metrics(self):
     SCALAR_FUNCTION_URN, flink_fn_execution_pb2.UserDefinedFunctions)
 def create(factory, transform_id, transform_proto, parameter, consumers):
 
 Review comment:
   Rename to `create_scalar_function`, making it consistent with `create_table_function`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   Hash:a34a306f023189434d990edd1a85eecfa324842a Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/149583826 TriggerType:PUSH TriggerID:a34a306f023189434d990edd1a85eecfa324842a
   -->
   ## CI report:
   
   * a34a306f023189434d990edd1a85eecfa324842a Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/149583826) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383852152
 
 

 ##########
 File path: flink-python/pyflink/table/tests/test_udtf.py
 ##########
 @@ -0,0 +1,91 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.table import DataTypes
+from pyflink.table.udf import TableFunction, udtf, ScalarFunction, udf
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase
+
+
+class UserDefinedTableFunctionTests(object):
+
+    def test_table_function(self):
+        table_sink = source_sink_utils.TestAppendSink(
+            ['a', 'b', 'c'],
+            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
+        self.t_env.register_table_sink("Results", table_sink)
+        self.t_env.register_function(
+            "multi_emit", udtf(MultiEmit(), [DataTypes.BIGINT(), DataTypes.BIGINT()],
+                               [DataTypes.BIGINT(), DataTypes.BIGINT()]))
+
+        self.t_env.register_function("condition_multi_emit", condition_multi_emit)
+
+        self.t_env.register_function(
+            "multi_num", udf(MultiNum(), [DataTypes.BIGINT()],
+                             DataTypes.BIGINT()))
+
+        t = self.t_env.from_elements([(1, 1, 3), (2, 1, 6), (3, 2, 9)], ['a', 'b', 'c'])
+        t.join_lateral("multi_emit(a, multi_num(b)) as (x, y)") \
+            .left_outer_join_lateral("condition_multi_emit(x, y) as m") \
+            .select("x, y, m") \
+            .insert_into("Results")
+        self.t_env.execute("test")
+        actual = source_sink_utils.results()
+        self.assert_equals(actual,
+                           ["1,0,null", "1,1,null", "2,0,null", "2,1,null", "3,0,0", "3,0,1",
+                            "3,0,2", "3,1,1", "3,1,2", "3,2,2", "3,3,null"])
+
+
+class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
+                                                 PyFlinkStreamTableTestCase):
+    pass
+
+
+class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
 
 Review comment:
   Also add test for blink planner under batch mode.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384227921
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.plan.nodes.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
 
 Review comment:
   protected

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r384298406
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCorrelate.scala
 ##########
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.plan.nodes.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.mutable
+
+trait CommonPythonCorrelate extends CommonPythonBase {
+  def getPythonTableFunctionOperator(
+      config: Configuration,
+      inputRowType: RowType,
+      outputRowType: RowType,
+      pythonFunctionInfo: PythonFunctionInfo,
+      udtfInputOffsets: Array[Int],
+      joinType: JoinRelType): OneInputStreamOperator[CRow, CRow] = {
+    val clazz = loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Configuration],
+      classOf[PythonFunctionInfo],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[JoinRelType])
+    ctor.newInstance(
+      config,
+      pythonFunctionInfo,
+      inputRowType,
+      outputRowType,
+      udtfInputOffsets,
+      joinType)
+      .asInstanceOf[OneInputStreamOperator[CRow, CRow]]
+  }
+
+  private[flink] def extractPythonTableFunctionInfo(
 
 Review comment:
   private?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11130: [FLINK-15972][python][table-planner][table-planner-blink] Add Python building blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#issuecomment-588081932
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/149583826",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a34a306f023189434d990edd1a85eecfa324842a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5298",
       "triggerID" : "a34a306f023189434d990edd1a85eecfa324842a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150641573",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5619",
       "triggerID" : "b03780cc0826c6e5a131b9b7f0e042d58d9364a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150763583",
       "triggerID" : "c79d7a9aa04b43cc0258535d2f676ee52bc9534d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8dffbd51da45135595bb8ba1b0c48f24d7ccef0a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/150785561",
       "triggerID" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5669",
       "triggerID" : "ddcd843e92059b7bc14bcddb6aa1e8580d00d481",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c79d7a9aa04b43cc0258535d2f676ee52bc9534d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150763583) 
   * 8dffbd51da45135595bb8ba1b0c48f24d7ccef0a UNKNOWN
   * ddcd843e92059b7bc14bcddb6aa1e8580d00d481 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150785561) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5669) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services