You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/10 03:11:44 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner

HuangXingBo opened a new pull request #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner
URL: https://github.com/apache/flink/pull/11044
 
 
   ## What is the purpose of the change
   
   *This pull request add Python Table Function Runner and Operator for executing Python user-defined table function in Blink Planner*
   
   
   ## Brief change log
   
     - *Add BaseRowPythonTableFunctionOperator*
     - *Add BaseRowPythonTableFunctionRunner*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Add runner test in BaseRowPythonTableFunctionRunnerTest*
     - *Add operator test in BaseRowPythonTableFunctionOperatorTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): ( no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#discussion_r378227586
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/BaseRowPythonTableFunctionRunner.java
 ##########
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.runners.python;
+
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+/**
+ * A {@link PythonFunctionRunner} used to execute Python {@link TableFunction}.
+ * It takes {@link BaseRow} as the input and output type.
 
 Review comment:
   This is not true any more due to FLINK-15897. Change the description to:
   It takes {@link BaseRow} as the input and outputs a byte array.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 closed pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 closed pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726376 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148731238 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148731238 TriggerType:MANUAL TriggerID:585627656
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726376) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120) 
   * d35fcc7bee22f220f89314db15e1eda13f1bba02 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148731238) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726376 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148731238 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726376) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120) 
   * d35fcc7bee22f220f89314db15e1eda13f1bba02 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148731238) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#discussion_r378218795
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/BaseRowPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.runners.python.BaseRowPythonTableFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+
+/**
+ * The Python {@link TableFunction} operator for the blink planner.
+ */
+@Internal
+public class BaseRowPythonTableFunctionOperator
+	extends AbstractPythonTableFunctionOperator<BaseRow, BaseRow, BaseRow> {
+
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordBaseRowWrappingCollector baseRowWrapper;
+
+	/**
+	 * The JoinedRow reused holding the execution result.
+	 */
+	private transient JoinedRow reuseJoinedRow;
+
+	/**
+	 * The Projection which projects the udtf input fields from the input row.
+	 */
+	private transient Projection<BaseRow, BinaryRow> udtfInputProjection;
+
+	/**
+	 * The TypeSerializer for udtf execution results.
+	 */
+	private transient TypeSerializer<BaseRow> udtfOutputTypeSerializer;
+
+	public BaseRowPythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		super.open();
+		baseRowWrapper = new StreamRecordBaseRowWrappingCollector(output);
+		reuseJoinedRow = new JoinedRow();
+
+		udtfInputProjection = createUdtfInputProjection();
+		udtfOutputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionOutputType);
+	}
+
+	@Override
+	public void bufferInput(BaseRow input) {
+		forwardedInputQueue.add(input);
+	}
+
+	@Override
+	public BaseRow getUdfInput(BaseRow element) {
+		return udtfInputProjection.apply(element);
+	}
+
+	@Override
+	public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
+		FnDataReceiver<byte[]> resultReceiver,
+		PythonEnvironmentManager pythonEnvironmentManager) {
+		return new BaseRowPythonTableFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			resultReceiver,
+			tableFunction,
+			pythonEnvironmentManager,
+			userDefinedFunctionInputType,
+			userDefinedFunctionOutputType);
+	}
+
+	private Projection<BaseRow, BinaryRow> createUdtfInputProjection() {
+		final GeneratedProjection generatedProjection = ProjectionCodeGenerator.generateProjection(
+			CodeGeneratorContext.apply(new TableConfig()),
+			"UdtfInputProjection",
+			inputType,
+			userDefinedFunctionInputType,
+			userDefinedFunctionInputOffsets);
+		// noinspection unchecked
+		return generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
+	}
+
+	@Override
+	public void emitResults() throws IOException {
+		BaseRow input = null;
+		byte[] rawUdtfResult;
+		while ((rawUdtfResult = userDefinedFunctionResultQueue.poll()) != null) {
+			if (input == null) {
+				input = forwardedInputQueue.poll();
+			}
+			boolean isFinishResult = isFinishResult(rawUdtfResult);
+			if (isFinishResult) {
+				input = forwardedInputQueue.poll();
+			}
 
 Review comment:
   The logic here contains a bug. Take the following case as an example:
   - There comes the first record. 
   - However, the result is empty, so the Python side only sends a finished result. 
   
   In this case, if it is a left join, we should output the result(input append null). 
   Maybe it's better to also add some test for this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#discussion_r378672265
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/BaseRowPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.runners.python.BaseRowPythonTableFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.io.IOException;
+
+/**
+ * The Python {@link TableFunction} operator for the blink planner.
+ */
+@Internal
+public class BaseRowPythonTableFunctionOperator
+	extends AbstractPythonTableFunctionOperator<BaseRow, BaseRow, BaseRow> {
+
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordBaseRowWrappingCollector baseRowWrapper;
+
+	/**
+	 * The JoinedRow reused holding the execution result.
+	 */
+	private transient JoinedRow reuseJoinedRow;
+
+	/**
+	 * The Projection which projects the udtf input fields from the input row.
+	 */
+	private transient Projection<BaseRow, BinaryRow> udtfInputProjection;
+
+	/**
+	 * The TypeSerializer for udtf execution results.
+	 */
+	private transient TypeSerializer<BaseRow> udtfOutputTypeSerializer;
+
+	/**
+	 * The type serializer for the forwarded fields.
+	 */
+	private transient BaseRowSerializer forwardedInputSerializer;
+
+	public BaseRowPythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets,
+		JoinRelType joinType) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets, joinType);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		super.open();
+		baseRowWrapper = new StreamRecordBaseRowWrappingCollector(output);
+		reuseJoinedRow = new JoinedRow();
+
+		udtfInputProjection = createUdtfInputProjection();
+		forwardedInputSerializer = new BaseRowSerializer(this.getExecutionConfig(), inputType);
+		udtfOutputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionOutputType);
+	}
+
+	@Override
+	public void bufferInput(BaseRow input) {
+		// always copy the input BaseRow
+		BaseRow forwardedFields = forwardedInputSerializer.copy(input);
+		forwardedFields.setHeader(input.getHeader());
+		forwardedInputQueue.add(input);
 
 Review comment:
   forwardedInputQueue.add(forwardedFields);

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583937848
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 507d7bb6076e4bc176c46f52e823762d264f3c28 (Mon Feb 10 03:13:40 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15913).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726376 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148731238 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148731238 TriggerType:MANUAL TriggerID:585627656
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726376) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120) 
   * d35fcc7bee22f220f89314db15e1eda13f1bba02 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148731238) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-585560961
 
 
   Thanks a lot for @hequn8128 review, I have addressed the comments at the latest commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#discussion_r378213520
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/BaseRowPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.runners.python.BaseRowPythonTableFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+
+/**
+ * The Python {@link TableFunction} operator for the blink planner.
+ */
+@Internal
+public class BaseRowPythonTableFunctionOperator
+	extends AbstractPythonTableFunctionOperator<BaseRow, BaseRow, BaseRow> {
+
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordBaseRowWrappingCollector baseRowWrapper;
+
+	/**
+	 * The JoinedRow reused holding the execution result.
+	 */
+	private transient JoinedRow reuseJoinedRow;
+
+	/**
+	 * The Projection which projects the udtf input fields from the input row.
+	 */
+	private transient Projection<BaseRow, BinaryRow> udtfInputProjection;
+
+	/**
+	 * The TypeSerializer for udtf execution results.
+	 */
+	private transient TypeSerializer<BaseRow> udtfOutputTypeSerializer;
+
+	public BaseRowPythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		super.open();
+		baseRowWrapper = new StreamRecordBaseRowWrappingCollector(output);
+		reuseJoinedRow = new JoinedRow();
+
+		udtfInputProjection = createUdtfInputProjection();
+		udtfOutputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionOutputType);
+	}
+
+	@Override
+	public void bufferInput(BaseRow input) {
+		forwardedInputQueue.add(input);
 
 Review comment:
   Take object reuse into consideration.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726376 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148731238 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726376) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120) 
   * d35fcc7bee22f220f89314db15e1eda13f1bba02 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148731238) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#discussion_r378223362
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/BaseRowPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.runners.python.BaseRowPythonTableFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+
+/**
+ * The Python {@link TableFunction} operator for the blink planner.
+ */
+@Internal
+public class BaseRowPythonTableFunctionOperator
+	extends AbstractPythonTableFunctionOperator<BaseRow, BaseRow, BaseRow> {
+
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordBaseRowWrappingCollector baseRowWrapper;
+
+	/**
+	 * The JoinedRow reused holding the execution result.
+	 */
+	private transient JoinedRow reuseJoinedRow;
+
+	/**
+	 * The Projection which projects the udtf input fields from the input row.
+	 */
+	private transient Projection<BaseRow, BinaryRow> udtfInputProjection;
+
+	/**
+	 * The TypeSerializer for udtf execution results.
+	 */
+	private transient TypeSerializer<BaseRow> udtfOutputTypeSerializer;
+
+	public BaseRowPythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		super.open();
+		baseRowWrapper = new StreamRecordBaseRowWrappingCollector(output);
+		reuseJoinedRow = new JoinedRow();
+
+		udtfInputProjection = createUdtfInputProjection();
+		udtfOutputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionOutputType);
+	}
+
+	@Override
+	public void bufferInput(BaseRow input) {
+		forwardedInputQueue.add(input);
+	}
+
+	@Override
+	public BaseRow getUdfInput(BaseRow element) {
 
 Review comment:
   Rename this method? It is strange to call udf in the udtf class. Maybe `getFunctionInput`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-585627656
 
 
   @flinkbot run travis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148726376 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148726376) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120) 
   * d35fcc7bee22f220f89314db15e1eda13f1bba02 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/148726376 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   Hash:d35fcc7bee22f220f89314db15e1eda13f1bba02 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148731238 TriggerType:PUSH TriggerID:d35fcc7bee22f220f89314db15e1eda13f1bba02
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/148726376) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120) 
   * d35fcc7bee22f220f89314db15e1eda13f1bba02 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148731238) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5124) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#discussion_r378674085
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractPythonTableFunctionOperator.java
 ##########
 @@ -45,14 +47,21 @@
 	 */
 	protected final PythonFunctionInfo tableFunction;
 
+	/**
+	 * The correlate join type.
+	 */
+	protected final JoinRelType joinType;
+
 	public AbstractPythonTableFunctionOperator(
 		Configuration config,
 		PythonFunctionInfo tableFunction,
 		RowType inputType,
 		RowType outputType,
-		int[] udtfInputOffsets) {
+		int[] udtfInputOffsets,
+		JoinRelType joinType) {
 		super(config, inputType, outputType, udtfInputOffsets);
 		this.tableFunction = Preconditions.checkNotNull(tableFunction);
+		this.joinType = Preconditions.checkNotNull(joinType);
 
 Review comment:
   Check inner and left. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/148130015 TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/148726376 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   Hash:e0be5057cfc067796c51c4917f5ca2c824ee7872 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120 TriggerType:PUSH TriggerID:e0be5057cfc067796c51c4917f5ca2c824ee7872
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/148130015) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4985) 
   * e0be5057cfc067796c51c4917f5ca2c824ee7872 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/148726376) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5120) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11044: [FLINK-15913][python] Add Python TableFunction Runner and Operator in Blink planner
URL: https://github.com/apache/flink/pull/11044#discussion_r378658794
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/BaseRowPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.runners.python.BaseRowPythonTableFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+
+/**
+ * The Python {@link TableFunction} operator for the blink planner.
+ */
+@Internal
+public class BaseRowPythonTableFunctionOperator
+	extends AbstractPythonTableFunctionOperator<BaseRow, BaseRow, BaseRow> {
+
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordBaseRowWrappingCollector baseRowWrapper;
+
+	/**
+	 * The JoinedRow reused holding the execution result.
+	 */
+	private transient JoinedRow reuseJoinedRow;
+
+	/**
+	 * The Projection which projects the udtf input fields from the input row.
+	 */
+	private transient Projection<BaseRow, BinaryRow> udtfInputProjection;
+
+	/**
+	 * The TypeSerializer for udtf execution results.
+	 */
+	private transient TypeSerializer<BaseRow> udtfOutputTypeSerializer;
+
+	public BaseRowPythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		super.open();
+		baseRowWrapper = new StreamRecordBaseRowWrappingCollector(output);
+		reuseJoinedRow = new JoinedRow();
+
+		udtfInputProjection = createUdtfInputProjection();
+		udtfOutputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(userDefinedFunctionOutputType);
+	}
+
+	@Override
+	public void bufferInput(BaseRow input) {
+		forwardedInputQueue.add(input);
+	}
+
+	@Override
+	public BaseRow getUdfInput(BaseRow element) {
+		return udtfInputProjection.apply(element);
+	}
+
+	@Override
+	public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
+		FnDataReceiver<byte[]> resultReceiver,
+		PythonEnvironmentManager pythonEnvironmentManager) {
+		return new BaseRowPythonTableFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			resultReceiver,
+			tableFunction,
+			pythonEnvironmentManager,
+			userDefinedFunctionInputType,
+			userDefinedFunctionOutputType);
+	}
+
+	private Projection<BaseRow, BinaryRow> createUdtfInputProjection() {
+		final GeneratedProjection generatedProjection = ProjectionCodeGenerator.generateProjection(
+			CodeGeneratorContext.apply(new TableConfig()),
+			"UdtfInputProjection",
+			inputType,
+			userDefinedFunctionInputType,
+			userDefinedFunctionInputOffsets);
+		// noinspection unchecked
+		return generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
+	}
+
+	@Override
+	public void emitResults() throws IOException {
+		BaseRow input = null;
+		byte[] rawUdtfResult;
+		while ((rawUdtfResult = userDefinedFunctionResultQueue.poll()) != null) {
+			if (input == null) {
+				input = forwardedInputQueue.poll();
+			}
+			boolean isFinishResult = isFinishResult(rawUdtfResult);
+			if (isFinishResult) {
+				input = forwardedInputQueue.poll();
+			}
 
 Review comment:
   Good Catch!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11044: [FLINK-15913][python] Add Python Table Function Runner And Operator In Blink Planner
URL: https://github.com/apache/flink/pull/11044#issuecomment-583941299
 
 
   <!--
   Meta data
   Hash:507d7bb6076e4bc176c46f52e823762d264f3c28 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:507d7bb6076e4bc176c46f52e823762d264f3c28
   -->
   ## CI report:
   
   * 507d7bb6076e4bc176c46f52e823762d264f3c28 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services