You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/05 10:05:05 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

HuangXingBo opened a new pull request #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020
 
 
   ## What is the purpose of the change
   
   *This pull request add Python Table Function Runner and Operator for executing Python user-defined table function in legacy Planner*
   
   ## Brief change log
     - *Add AbstractPythonTableFunctionOperator and PythonTableFunctionOperator*
     - *Add AbstractPythonStatelessFunctionRunner*
     - *Add AbstractPythonTableFunctionRunner and PythonTableFunctionRunner*
     - *TableSerializer and RowTableSerializer*
     - *Add Table Type in flink-fn-execution.proto*
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
     - *Add runner test in PythonTableFunctionRunnerTest*
     - *Add operator test in PythonTableFunctionOperatorTestBase*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-583394809
 
 
   Thanks a lot for @hequn8128  review, I have addressed the comments at the latest commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375688027
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/TableSerializer.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+
+/**
+ * Base Table Serializer for Table Function.
+ */
+@Internal
+abstract class TableSerializer<T> extends TypeSerializer<T> {
 
 Review comment:
   Rename this to `PythonTableFunctionSerializer`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375805768
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * @param <IN>      Type of the input elements.
+ * @param <OUT>     Type of the output elements.
+ * @param <UDTFIN>  Type of the UDF input type.
+ * @param <UDTFOUT> Type of the UDF input type.
+ */
+public abstract class AbstractPythonTableFunctionOperator<IN, OUT, UDTFIN, UDTFOUT>
 
 Review comment:
   Add `@Internal`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   * e2ea6049aa8c495dda6b78395435170fa0c56b8c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375806263
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonTableFunctionRunner.java
 ##########
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.runners.python;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+/**
+ * Abstract {@link PythonFunctionRunner} used to execute Python {@link TableFunction}.
+ *
+ * @param <IN>  Type of the input elements.
+ * @param <OUT> Type of the execution results.
+ */
+public abstract class AbstractPythonTableFunctionRunner<IN, OUT> extends AbstractPythonStatelessFunctionRunner<IN, OUT> {
 
 Review comment:
   Add `@Internal`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375802345
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/PythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.runners.python.PythonTableFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+/**
+ * The Python {@link TableFunction} operator for the legacy planner.
+ */
+public class PythonTableFunctionOperator extends AbstractPythonTableFunctionOperator<CRow, CRow, Row, Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordCRowWrappingCollector cRowWrapper;
+
+	public PythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
+	}
+
+	private boolean isFinishResult(Row result) {
+		return result.getArity() == 0;
+	}
+
+	@Override
+	public void emitResults() {
+		Row udtfResult;
+		CRow input = null;
+		while ((udtfResult = udtfResultQueue.poll()) != null) {
+			if (input == null) {
+				input = forwardedInputQueue.poll();
+			}
+			if (isFinishResult(udtfResult)) {
+				input = forwardedInputQueue.poll();
+			}
+			if (input != null && !isFinishResult(udtfResult)) {
+				cRowWrapper.setChange(input.change());
+				cRowWrapper.collect(Row.join(input.row(), udtfResult));
+			}
+		}
+	}
+
+	@Override
+	public void bufferInput(CRow input) {
+		forwardedInputQueue.add(input);
+	}
+
+	@Override
+	public Row getUdtfInput(CRow element) {
+		return Row.project(element.row(), udtfInputOffsets);
+	}
+
+	@Override
+	public PythonFunctionRunner<Row> createPythonFunctionRunner(
+		FnDataReceiver<Row> resultReceiver,
+		PythonEnvironmentManager pythonEnvironmentManager) {
+		return new PythonTableFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			resultReceiver,
+			tableFunction,
+			pythonEnvironmentManager,
+			udtfInputType,
+			udtfOutputType);
+	}
+
+	/**
+	 * The collector is used to convert a {@link Row} to a {@link CRow}.
+	 */
+	private static class StreamRecordCRowWrappingCollector implements Collector<Row> {
 
 Review comment:
   This class is copied from `PythonScalarFunctionOperator`. I think we can do some code reuse.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-583281187
 
 
   Thanks a lot for @hequn8128 review, I have addressed the comments at the latest commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582333482
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 024d27cfa21d92931533773e11b6b672baebad2e (Wed Feb 05 10:08:29 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15913).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375780067
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
 ##########
 @@ -90,6 +91,39 @@ public static TypeSerializer toBlinkTypeSerializer(LogicalType logicalType) {
 		return logicalType.accept(new LogicalTypeToProtoTypeConverter());
 	}
 
+	public static TypeSerializer toFlinkTableTypeSerializer(LogicalType logicalType) {
+		RowType rowType = (RowType) logicalType;
+		LogicalTypeDefaultVisitor<TypeSerializer> converter =
+			new LogicalTypeToTypeSerializerConverter();
+		final TypeSerializer[] fieldTypeSerializers = rowType.getFields()
+			.stream()
+			.map(f -> f.getType().accept(converter))
+			.toArray(TypeSerializer[]::new);
+		return new RowTableSerializer(fieldTypeSerializers);
+	}
+
+	public static FlinkFnApi.Schema.FieldType toTableProtoType(LogicalType logicalType) {
 
 Review comment:
   Rename method name to `toTableFunctionProtoType `

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376213118
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/TableSerializer.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+
+/**
+ * Base Table Serializer for Table Function.
+ */
+@Internal
+abstract class TableSerializer<T> extends TypeSerializer<T> {
 
 Review comment:
   After rebase the pr FLINK-15897, I think we can remove the TableFunctionSerializer.Because we can decide whether the message is finish message according to the length and content of  the message byte array.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376374541
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @param <IN>     Type of the input elements.
+ * @param <OUT>    Type of the output elements.
+ * @param <UDTFIN> Type of the UDTF input type.
+ */
+@Internal
+public abstract class AbstractPythonTableFunctionOperator<IN, OUT, UDTFIN>
+	extends AbstractStatelessFunctionOperator<IN, OUT, UDTFIN> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The Python {@link TableFunction} to be executed.
+	 */
+	protected final PythonFunctionInfo tableFunction;
+
+	public AbstractPythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, inputType, outputType, udtfInputOffsets);
+		this.tableFunction = Preconditions.checkNotNull(tableFunction);
+	}
+
+	@Override
+	public void open() throws Exception {
+		List<RowType.RowField> udtfOutputDataFields = new ArrayList<>(
+			outputType.getFields().subList(inputType.getFieldCount(), outputType.getFieldCount()));
+		udfOutputType = new RowType(udtfOutputDataFields);
+		super.open();
+	}
+
+	@Override
+	public PythonEnv getPythonEnv() {
+		return tableFunction.getPythonFunction().getPythonEnv();
+	}
+
+	/**
+	 * The received udtf execution result is a finish message when it is a byte 0x00.
 
 Review comment:
   when it is a byte 0x00 => when it is a byte with value 0x00

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147886372 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:116926b9f65acc72ae3058efb19a47ce8ca0f412 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4954 TriggerType:PUSH TriggerID:116926b9f65acc72ae3058efb19a47ce8ca0f412
   Hash:116926b9f65acc72ae3058efb19a47ce8ca0f412 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/147928631 TriggerType:PUSH TriggerID:116926b9f65acc72ae3058efb19a47ce8ca0f412
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   * e2ea6049aa8c495dda6b78395435170fa0c56b8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147886372) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932) 
   * 116926b9f65acc72ae3058efb19a47ce8ca0f412 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/147928631) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4954) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375790325
 
 

 ##########
 File path: flink-python/pyflink/proto/flink-fn-execution.proto
 ##########
 @@ -73,6 +73,7 @@ message Schema {
     ARRAY = 16;
     MAP = 17;
     MULTISET = 18;
+    TABLE = 19;
 
 Review comment:
   Can we reuse the ROW Type here? A ROW type for the TableFunction should be a TABLE type.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376367844
 
 

 ##########
 File path: flink-python/pyflink/proto/flink-fn-execution.proto
 ##########
 @@ -73,6 +73,7 @@ message Schema {
     ARRAY = 16;
     MAP = 17;
     MULTISET = 18;
+    TABLEFUNCTIONROW = 19;
 
 Review comment:
   TABLE_FUNCTION_ROW ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147886372 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:116926b9f65acc72ae3058efb19a47ce8ca0f412 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4954 TriggerType:PUSH TriggerID:116926b9f65acc72ae3058efb19a47ce8ca0f412
   Hash:116926b9f65acc72ae3058efb19a47ce8ca0f412 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147928631 TriggerType:PUSH TriggerID:116926b9f65acc72ae3058efb19a47ce8ca0f412
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   * e2ea6049aa8c495dda6b78395435170fa0c56b8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147886372) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932) 
   * 116926b9f65acc72ae3058efb19a47ce8ca0f412 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147928631) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4954) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376213859
 
 

 ##########
 File path: flink-python/pyflink/proto/flink-fn-execution.proto
 ##########
 @@ -73,6 +73,7 @@ message Schema {
     ARRAY = 16;
     MAP = 17;
     MULTISET = 18;
+    TABLE = 19;
 
 Review comment:
   Because the proto doesn't include any info about the function type is TableFunction, We need  extra info to know the coder type or func type in Python.Maybe there would be better solution. What's your suggestion?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] HuangXingBo commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376165491
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/TableSerializer.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+
+/**
+ * Base Table Serializer for Table Function.
+ */
+@Internal
+abstract class TableSerializer<T> extends TypeSerializer<T> {
 
 Review comment:
   I will add BaseRowTableFunctionSerializer extends from TableFunctionSerializers in next pr.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375801854
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/PythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.runners.python.PythonTableFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+/**
+ * The Python {@link TableFunction} operator for the legacy planner.
+ */
+public class PythonTableFunctionOperator extends AbstractPythonTableFunctionOperator<CRow, CRow, Row, Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordCRowWrappingCollector cRowWrapper;
+
+	public PythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
+	}
+
+	private boolean isFinishResult(Row result) {
+		return result.getArity() == 0;
+	}
+
+	@Override
+	public void emitResults() {
+		Row udtfResult;
+		CRow input = null;
+		while ((udtfResult = udtfResultQueue.poll()) != null) {
+			if (input == null) {
+				input = forwardedInputQueue.poll();
+			}
+			if (isFinishResult(udtfResult)) {
+				input = forwardedInputQueue.poll();
+			}
+			if (input != null && !isFinishResult(udtfResult)) {
+				cRowWrapper.setChange(input.change());
+				cRowWrapper.collect(Row.join(input.row(), udtfResult));
+			}
+		}
+	}
+
+	@Override
+	public void bufferInput(CRow input) {
 
 Review comment:
   Copy the input Row before putting into the queue if `getExecutionConfig().isObjectReuseEnabled()` returns true

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375779897
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
 ##########
 @@ -90,6 +91,39 @@ public static TypeSerializer toBlinkTypeSerializer(LogicalType logicalType) {
 		return logicalType.accept(new LogicalTypeToProtoTypeConverter());
 	}
 
+	public static TypeSerializer toFlinkTableTypeSerializer(LogicalType logicalType) {
 
 Review comment:
   Rename the method name to `toFlinkTableFunctionTypeSerializer `

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/147886372 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   * e2ea6049aa8c495dda6b78395435170fa0c56b8c Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/147886372) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375797311
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/TableSerializer.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+
+/**
+ * Base Table Serializer for Table Function.
+ */
+@Internal
+abstract class TableSerializer<T> extends TypeSerializer<T> {
+
+	private final TypeSerializer[] fieldSerializers;
+
+	private transient boolean[] nullMask;
+
+	TableSerializer(TypeSerializer[] fieldSerializers) {
+		this.fieldSerializers = fieldSerializers;
+		this.nullMask = new boolean[Math.max(fieldSerializers.length - 8, 0)];
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		throw new RuntimeException("This method duplicate() should not be called");
+	}
+
+	@Override
+	public T createInstance() {
+		return unwantedMethodCall("createInstance()");
+	}
+
+	@Override
+	public T copy(T from) {
+		return unwantedMethodCall("copy(T from)");
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		return unwantedMethodCall("copy(T from, T reuse)");
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return unwantedMethodCall("deserialize(T reuse, DataInputView source)");
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		unwantedMethodCall("copy(DataInputView source, DataOutputView target)");
+	}
+
+	private T unwantedMethodCall(String methodName) {
+		throw new RuntimeException(String.format("The method %s should not be called", methodName));
+	}
+
+	public abstract T createResult(int len);
+
+	public abstract void setField(T result, int index, Object value);
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
 
 Review comment:
   Add some comments for this method. It would be quite helpful for other ones to understand.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147886372 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:116926b9f65acc72ae3058efb19a47ce8ca0f412 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:116926b9f65acc72ae3058efb19a47ce8ca0f412
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   * e2ea6049aa8c495dda6b78395435170fa0c56b8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147886372) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932) 
   * 116926b9f65acc72ae3058efb19a47ce8ca0f412 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376381992
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/PythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.runners.python.PythonTableFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.runtime.types.CRowTypeInfo;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+
+/**
+ * The Python {@link TableFunction} operator for the legacy planner.
+ */
+@Internal
+public class PythonTableFunctionOperator extends AbstractPythonTableFunctionOperator<CRow, CRow, Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordCRowWrappingCollector cRowWrapper;
+
+	/**
+	 * The type serializer for the forwarded fields.
+	 */
+	private transient TypeSerializer<CRow> forwardedInputSerializer;
+
+	/**
+	 * The TypeSerializer for udf execution results.
 
 Review comment:
   udf => udtf

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376379210
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
 ##########
 @@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for all stream operators to execute Python Stateless Functions.
+ *
+ * @param <IN>    Type of the input elements.
+ * @param <OUT>   Type of the output elements.
+ * @param <UDFIN> Type of the UDF input type.
+ */
+@Internal
+public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN>
+	extends AbstractPythonFunctionOperator<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The input logical type.
+	 */
+	protected final RowType inputType;
+
+	/**
+	 * The output logical type.
+	 */
+	protected final RowType outputType;
+
+	/**
+	 * The offsets of udf inputs.
+	 */
+	protected final int[] udfInputOffsets;
+
+	/**
+	 * The udf input logical type.
+	 */
+	protected transient RowType udfInputType;
+
+	/**
+	 * The udf output logical type.
+	 */
+	protected transient RowType udfOutputType;
+
+	/**
+	 * The queue holding the input elements for which the execution results have not been received.
+	 */
+	protected transient LinkedBlockingQueue<IN> forwardedInputQueue;
+
+	/**
+	 * The queue holding the user-defined table function execution results. The execution results
 
 Review comment:
   user-defined table function => user-defined function

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376382440
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/PythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.runners.python.PythonTableFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.runtime.types.CRowTypeInfo;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+
+/**
+ * The Python {@link TableFunction} operator for the legacy planner.
+ */
+@Internal
+public class PythonTableFunctionOperator extends AbstractPythonTableFunctionOperator<CRow, CRow, Row> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The collector used to collect records.
+	 */
+	private transient StreamRecordCRowWrappingCollector cRowWrapper;
+
+	/**
+	 * The type serializer for the forwarded fields.
+	 */
+	private transient TypeSerializer<CRow> forwardedInputSerializer;
+
+	/**
+	 * The TypeSerializer for udf execution results.
+	 */
+	private transient TypeSerializer<Row> udtfOutputTypeSerializer;
+
+	public PythonTableFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo tableFunction,
+		RowType inputType,
+		RowType outputType,
+		int[] udtfInputOffsets) {
+		super(config, tableFunction, inputType, outputType, udtfInputOffsets);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		super.open();
+		this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
+		CRowTypeInfo forwardedInputTypeInfo = new CRowTypeInfo(
+			new RowTypeInfo(TypeConversions.fromDataTypeToLegacyInfo(
+				TypeConversions.fromLogicalToDataType(inputType))));
+		forwardedInputSerializer = forwardedInputTypeInfo.createSerializer(getExecutionConfig());
+		udtfOutputTypeSerializer = PythonTypeUtils.toFlinkTypeSerializer(udfOutputType);
+	}
+
+	@Override
+	public void emitResults() throws IOException {
+		CRow input = null;
+		byte[] rawUdtfResult;
+		while ((rawUdtfResult = udfResultQueue.poll()) != null) {
+			if (input == null) {
+				input = forwardedInputQueue.poll();
+			}
+			boolean isFinishResult = isFinishResult(rawUdtfResult);
+			if (isFinishResult) {
+				input = forwardedInputQueue.poll();
+			}
+			if (input != null && !isFinishResult) {
+				bais.setBuffer(rawUdtfResult, 0, rawUdtfResult.length);
+				Row udtfResult = udtfOutputTypeSerializer.deserialize(baisWrapper);
+				cRowWrapper.setChange(input.change());
+				cRowWrapper.collect(Row.join(input.row(), udtfResult));
+			}
+		}
+	}
+
+	@Override
+	public void bufferInput(CRow input) {
+		if (getExecutionConfig().isObjectReuseEnabled()) {
+			input =  forwardedInputSerializer.copy(input);
 
 Review comment:
   Remove the blank after `=`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375805279
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractPythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * @param <IN>      Type of the input elements.
+ * @param <OUT>     Type of the output elements.
+ * @param <UDTFIN>  Type of the UDF input type.
+ * @param <UDTFOUT> Type of the UDF input type.
+ */
+public abstract class AbstractPythonTableFunctionOperator<IN, OUT, UDTFIN, UDTFOUT>
 
 Review comment:
   Please try to avoid code deduplication. For example, we can add a `StatelessFunctionOperator` and put the same code in this class. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147886372 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   * e2ea6049aa8c495dda6b78395435170fa0c56b8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147886372) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375778304
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
 ##########
 @@ -90,6 +91,39 @@ public static TypeSerializer toBlinkTypeSerializer(LogicalType logicalType) {
 		return logicalType.accept(new LogicalTypeToProtoTypeConverter());
 	}
 
+	public static TypeSerializer toFlinkTableTypeSerializer(LogicalType logicalType) {
 
 Review comment:
   Since this method is only used by TableFunction, I'd like not to add this method in the class. How about add it in `PythonTableFunctionRunner`. 
   
   Same for `toTableProtoType`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375785044
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/TableSerializer.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+
+/**
+ * Base Table Serializer for Table Function.
+ */
+@Internal
+abstract class TableSerializer<T> extends TypeSerializer<T> {
 
 Review comment:
   Do we have any other TableFunctionSerializers beside RowTableFunctionSerializer? For example, TupleTableFunctionSerializer. If there no other serializers, I'd like not to add this abstract class. Instead, simply add RowTableFunctionSerializer extends from TypeSerializer would be ok. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 merged pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 merged pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-583930209
 
 
   It seems the test failure has nothing to do with this PR. I will merge this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r376380811
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
 ##########
 @@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for all stream operators to execute Python Stateless Functions.
+ *
+ * @param <IN>    Type of the input elements.
+ * @param <OUT>   Type of the output elements.
+ * @param <UDFIN> Type of the UDF input type.
+ */
+@Internal
+public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN>
+	extends AbstractPythonFunctionOperator<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The input logical type.
+	 */
+	protected final RowType inputType;
+
+	/**
+	 * The output logical type.
+	 */
+	protected final RowType outputType;
+
+	/**
+	 * The offsets of udf inputs.
+	 */
+	protected final int[] udfInputOffsets;
+
+	/**
+	 * The udf input logical type.
+	 */
+	protected transient RowType udfInputType;
+
+	/**
+	 * The udf output logical type.
+	 */
+	protected transient RowType udfOutputType;
 
 Review comment:
   How about rename `udfOutputType` to `functionOutputType` or `userDefinedFunctionOutputType`? It seems strange to see the `udfOutputType` variable in `AbstractPythonTableFunctionOperator`. 
   
   Same for other variable names.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375806107
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/PythonTableFunctionOperator.java
 ##########
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.runners.python.PythonTableFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+/**
+ * The Python {@link TableFunction} operator for the legacy planner.
+ */
+public class PythonTableFunctionOperator extends AbstractPythonTableFunctionOperator<CRow, CRow, Row, Row> {
 
 Review comment:
   Add `@Internal`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375778883
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
 ##########
 @@ -90,6 +91,39 @@ public static TypeSerializer toBlinkTypeSerializer(LogicalType logicalType) {
 		return logicalType.accept(new LogicalTypeToProtoTypeConverter());
 	}
 
+	public static TypeSerializer toFlinkTableTypeSerializer(LogicalType logicalType) {
+		RowType rowType = (RowType) logicalType;
 
 Review comment:
   It's not good to cast here. It's better to change the method from `toFlinkTableTypeSerializer(LogicalType logicalType)` to `toFlinkTableTypeSerializer(RowType logicalType)`
   
   Same for `toTableProtoType`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:e2ea6049aa8c495dda6b78395435170fa0c56b8c Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147886372 TriggerType:PUSH TriggerID:e2ea6049aa8c495dda6b78395435170fa0c56b8c
   Hash:116926b9f65acc72ae3058efb19a47ce8ca0f412 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4954 TriggerType:PUSH TriggerID:116926b9f65acc72ae3058efb19a47ce8ca0f412
   Hash:116926b9f65acc72ae3058efb19a47ce8ca0f412 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147928631 TriggerType:PUSH TriggerID:116926b9f65acc72ae3058efb19a47ce8ca0f412
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   * e2ea6049aa8c495dda6b78395435170fa0c56b8c Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147886372) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4932) 
   * 116926b9f65acc72ae3058efb19a47ce8ca0f412 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147928631) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4954) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375782409
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowTableSerializer.java
 ##########
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+
+/**
+ * The implementation of TableSerializer in legacy planner.
+ */
+@Internal
+public final class RowTableSerializer extends TableSerializer<Row> {
 
 Review comment:
   Rename this to `RowTableFunctionSerializer`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11020: [FLINK-15913][python] Add Python Table Function Runner And Operator In Legacy Planner
URL: https://github.com/apache/flink/pull/11020#issuecomment-582346890
 
 
   <!--
   Meta data
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/147523105 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:024d27cfa21d92931533773e11b6b672baebad2e Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857 TriggerType:PUSH TriggerID:024d27cfa21d92931533773e11b6b672baebad2e
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/147536855 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   Hash:a2f0e14c373b5fb8aa56ec8371899e549e8859fa Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862 TriggerType:PUSH TriggerID:a2f0e14c373b5fb8aa56ec8371899e549e8859fa
   -->
   ## CI report:
   
   * 024d27cfa21d92931533773e11b6b672baebad2e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/147523105) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4857) 
   * a2f0e14c373b5fb8aa56ec8371899e549e8859fa Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/147536855) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4862) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11020: [FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner
URL: https://github.com/apache/flink/pull/11020#discussion_r375787457
 
 

 ##########
 File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/TableSerializer.java
 ##########
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+
+/**
+ * Base Table Serializer for Table Function.
+ */
+@Internal
+abstract class TableSerializer<T> extends TypeSerializer<T> {
+
+	private final TypeSerializer[] fieldSerializers;
+
+	private transient boolean[] nullMask;
+
+	TableSerializer(TypeSerializer[] fieldSerializers) {
+		this.fieldSerializers = fieldSerializers;
+		this.nullMask = new boolean[Math.max(fieldSerializers.length - 8, 0)];
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		throw new RuntimeException("This method duplicate() should not be called");
+	}
+
+	@Override
+	public T createInstance() {
+		return unwantedMethodCall("createInstance()");
+	}
+
+	@Override
+	public T copy(T from) {
+		return unwantedMethodCall("copy(T from)");
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		return unwantedMethodCall("copy(T from, T reuse)");
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return unwantedMethodCall("deserialize(T reuse, DataInputView source)");
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		unwantedMethodCall("copy(DataInputView source, DataOutputView target)");
+	}
+
+	private T unwantedMethodCall(String methodName) {
+		throw new RuntimeException(String.format("The method %s should not be called", methodName));
+	}
+
+	public abstract T createResult(int len);
+
+	public abstract void setField(T result, int index, Object value);
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		int len = fieldSerializers.length;
+		int b = source.readUnsignedByte() & 0xff;
+		DataInputStream inputStream = (DataInputStream) source;
+		if (b == 0x00 && inputStream.available() == 0) {
+			return createResult(0);
+		}
+		T result = createResult(len);
+		int minLen = Math.min(8, len);
+		readIntoNullMask(len - 8, source, nullMask);
+		for (int i = 0; i < minLen; i++) {
+			if ((b & 0x80) > 0) {
+				setField(result, i, null);
+			} else {
+				setField(result, i, fieldSerializers[i].deserialize(source));
+			}
+			b = b << 1;
+		}
+		for (int i = 0, j = minLen; j < len; i++, j++) {
+			if (nullMask[i]) {
+				setField(result, j, null);
+			} else {
+				setField(result, j, fieldSerializers[j].deserialize(source));
+			}
+		}
+
+		return result;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
 
 Review comment:
   This method has not been overrided by the child class `RowTableFunctionSerializer`. Once we have multi child classes, the equals method would be wrong, for example, a RowTableFunctionSerializer will equal to a TupleTableFunctionSerializer if they share the same filed serializers.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services