You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/07 09:28:27 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

HuangXingBo opened a new pull request #12841:
URL: https://github.com/apache/flink/pull/12841


   ## What is the purpose of the change
   
   *This pull request will extract the implementation logic of Beam in AbstractPythonFunctionOperator*
   
   
   ## Brief change log
   
     - *Refactor PythonFunctionRunner and related child classes*
     - *Move Serilization/Deserilization to PythonOperators*
     - *Add ArrowSerilizer*
     - *Add Environment Interface*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Previous tests are enough*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299) 
   * 7165f8e65fa644d694e2b3ede275532098c25af2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8258612413edd24a2effe7b8cf8c7ccfb2c1caf6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7165f8e65fa644d694e2b3ede275532098c25af2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #12841:
URL: https://github.com/apache/flink/pull/12841#discussion_r451938393



##########
File path: flink-python/src/main/java/org/apache/flink/python/env/ProcessEnvironment.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Map;
+
+/**
+ * A {@link PythonEnvironment} for executing UDFs in Process.
+ */
+@Internal
+public class ProcessEnvironment implements PythonEnvironment {

Review comment:
       rename to ProcessPythonEnvironment?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -320,23 +307,20 @@ private void checkInvokeFinishBundleByCount() throws Exception {
 	 */
 	private void checkInvokeFinishBundleByTime() throws Exception {
 		long now = getProcessingTimeService().getCurrentProcessingTime();
-		if (now - lastFinishBundleTime >= maxBundleTimeMills) {
+		if (now - lastFinishBundleTime >= maxBundleTimeMills && elementCount > 0) {
 			invokeFinishBundle();
 		}
 	}
 
-	private void invokeFinishBundle() throws Exception {
-		if (bundleStarted.compareAndSet(true, false)) {
-			pythonFunctionRunner.finishBundle();
-
-			emitResults();
-			elementCount = 0;
-			lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
-			// callback only after current bundle was fully finalized
-			if (bundleFinishedCallback != null) {
-				bundleFinishedCallback.run();
-				bundleFinishedCallback = null;
-			}
+	protected void invokeFinishBundle() throws Exception {

Review comment:
       There are many places calling this method without checking the elementCount. What about wrapping the logic in an **if** check and then there is no need to check it everywhere calling this method.
   ```
   if (elementCount > 0) {
   xxx
   }
   ```
   

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
##########
@@ -90,4 +90,10 @@ public void bufferInput(CRow input) {
 	public Row getFunctionInput(CRow element) {
 		return Row.project(element.row(), userDefinedFunctionInputOffsets);
 	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public TypeSerializer<Row> getInputTypeSerializer() {

Review comment:
       ditto

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
##########
@@ -76,53 +70,74 @@ public RowDataArrowPythonScalarFunctionOperator(
 	@Override
 	public void open() throws Exception {
 		super.open();
-		allocator = ArrowUtils.getRootAllocator().newChildAllocator("reader", 0, Long.MAX_VALUE);
-		reader = new ArrowStreamReader(bais, allocator);
+		maxArrowBatchSize = Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize);
+		arrowSerializer = new RowDataArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType);
+		arrowSerializer.open(bais, baos);
+		currentBatchCount = 0;
 	}
 
 	@Override
-	public void close() throws Exception {
-		try {
-			super.close();
-		} finally {
-			reader.close();
-			allocator.close();
+	public void processElement(StreamRecord<RowData> element) throws Exception {
+		RowData value = element.getValue();
+		bufferInput(value);
+		arrowSerializer.dump(getFunctionInput(value));
+		currentBatchCount++;
+		if (currentBatchCount >= maxArrowBatchSize) {
+			invokeCurrentBatch();
 		}
+		checkInvokeFinishBundleByCount();
+		emitResults();
+	}
+
+	@Override
+	protected void invokeFinishBundle() throws Exception {
+		invokeCurrentBatch();
+		super.invokeFinishBundle();
 	}
 
 	@Override
-	public PythonFunctionRunner<RowData> createPythonFunctionRunner(
-		FnDataReceiver<byte[]> resultReceiver,
-		PythonEnvironmentManager pythonEnvironmentManager,
-		Map<String, String> jobOptions) {
-		return new RowDataArrowPythonScalarFunctionRunner(
-			getRuntimeContext().getTaskName(),
-			resultReceiver,
-			scalarFunctions,
-			pythonEnvironmentManager,
-			userDefinedFunctionInputType,
-			userDefinedFunctionOutputType,
-			getPythonConfig().getMaxArrowBatchSize(),
-			jobOptions,
-			getFlinkMetricContainer());
+	public void endInput() throws Exception {
+		invokeCurrentBatch();
+		super.endInput();
+	}
+
+	@Override
+	public void dispose() throws Exception {

Review comment:
       Should also add dispose method in ArrowPythonScalarFunctionOperator.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
##########
@@ -157,6 +157,21 @@
 	 */
 	protected transient TypeSerializer<Row> forwardedInputSerializer;
 
+	/**
+	 *
+	 */
+	protected transient TypeSerializer<Row> inputTypeSerializer;

Review comment:
       private

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonStatelessFunctionRunner.java
##########
@@ -158,24 +156,6 @@ public ExecutableStage createExecutableStage() throws Exception {
 			components, createPythonExecutionEnvironment(), input, sideInputs, userStates, timers, transforms, outputs, createValueOnlyWireCoderSetting());
 	}
 
-	public FlinkFnApi.UserDefinedFunction getUserDefinedFunctionProto(PythonFunctionInfo pythonFunctionInfo) {
-		FlinkFnApi.UserDefinedFunction.Builder builder = FlinkFnApi.UserDefinedFunction.newBuilder();
-		builder.setPayload(ByteString.copyFrom(pythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()));
-		for (Object input : pythonFunctionInfo.getInputs()) {
-			FlinkFnApi.UserDefinedFunction.Input.Builder inputProto =
-				FlinkFnApi.UserDefinedFunction.Input.newBuilder();
-			if (input instanceof PythonFunctionInfo) {
-				inputProto.setUdf(getUserDefinedFunctionProto((PythonFunctionInfo) input));
-			} else if (input instanceof Integer) {
-				inputProto.setInputOffset((Integer) input);
-			} else {
-				inputProto.setInputConstant(ByteString.copyFrom((byte[]) input));
-			}
-			builder.addInputs(inputProto);
-		}
-		return builder.build();
-	}
-

Review comment:
       getInputType/getOutputType isn't used any more and can be removed.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {

Review comment:
       Mark as @Internal, the same for the other classes.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
##########
@@ -76,53 +70,74 @@ public RowDataArrowPythonScalarFunctionOperator(
 	@Override
 	public void open() throws Exception {
 		super.open();
-		allocator = ArrowUtils.getRootAllocator().newChildAllocator("reader", 0, Long.MAX_VALUE);
-		reader = new ArrowStreamReader(bais, allocator);
+		maxArrowBatchSize = Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize);

Review comment:
       The logic for maxArrowBatchSize is different between RowDataArrowPythonScalarFunctionOperator and ArrowPythonScalarFunctionOperator

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamPythonFunctionRunner.java
##########
@@ -212,7 +181,54 @@ public void close() throws Exception {
 	}
 
 	@Override
-	public void startBundle() {
+	public void process(byte[] data) throws Exception {
+		checkInvokeStartBundle();
+		mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(data));
+	}
+
+	@Override
+	public Tuple2<byte[], Integer> pollResult() throws Exception {
+		byte[] result = resultBuffer.poll();
+		if (result == null) {
+			return null;
+		} else {
+			this.resultTuple.f0 = result;
+			this.resultTuple.f1 = result.length;
+			return this.resultTuple;
+		}
+	}
+
+	@Override
+	public void flush() throws Exception {
+		if (bundleStarted) {
+			finishBundle();
+			bundleStarted = false;
+		}
+	}
+
+	public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) throws Exception {
+		return DefaultJobBundleFactory.create(
+			JobInfo.create(taskName, taskName, environmentManager.createRetrievalToken(), pipelineOptions));
+	}
+
+	/**
+	 * Creates a specification which specifies the portability Python execution environment.
+	 * It's used by Beam's portability framework to creates the actual Python execution environment.
+	 */
+	protected RunnerApi.Environment createPythonExecutionEnvironment() throws Exception {

Review comment:
       private

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {
+
+	/**
+	 * The input RowType.
+	 */
+	protected final RowType inputType;
+
+	/**
+	 * The output RowType.
+	 */
+	protected final RowType outputType;
+
+	/**
+	 * Allocator which is used for byte buffer allocation.
+	 */
+	private transient BufferAllocator allocatorReader;

Review comment:
       rename to allocator

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowDataPythonScalarFunctionOperator.java
##########
@@ -98,6 +99,12 @@ public RowData getFunctionInput(RowData element) {
 		return udfInputProjection.apply(element);
 	}
 
+	@Override
+	@SuppressWarnings("unchecked")
+	public TypeSerializer<RowData> getInputTypeSerializer() {

Review comment:
       Move to RowDataPythonScalarFunctionOperator as this method is not necessary for RowDataArrowPythonScalarFunctionOperator

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -296,19 +286,16 @@ private void reserveMemoryForPythonWorker() throws MemoryReservationException {
 		}
 	}
 
-	/**
-	 * Checks whether to invoke startBundle.
-	 */
-	private void checkInvokeStartBundle() throws Exception {
-		if (bundleStarted.compareAndSet(false, true)) {
-			pythonFunctionRunner.startBundle();
+	protected void emitResults() throws Exception {
+		while ((resultTuple = pythonFunctionRunner.pollResult()) != null) {
+			emitResult();

Review comment:
       What about make **resultTuple** a local variable? e.g.
   ```
   emitResult(resultTuple);
   ```
   It will make the code more readable. What do you think?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {
+
+	/**
+	 * The input RowType.
+	 */
+	protected final RowType inputType;
+
+	/**
+	 * The output RowType.
+	 */
+	protected final RowType outputType;
+
+	/**
+	 * Allocator which is used for byte buffer allocation.
+	 */
+	private transient BufferAllocator allocatorReader;
+
+	/**
+	 * Reader which is responsible for deserialize the Arrow format data to the Flink rows.
+	 */
+	private transient ArrowReader<T> arrowReader;
+
+	/**
+	 * Reader which is responsible for convert the execution result from
+	 * byte array to arrow format.
+	 */
+	private transient ArrowStreamReader arrowStreamReader;
+
+	/**
+	 * Container that holds a set of vectors for the input elements
+	 * to be sent to the Python worker.
+	 */
+	protected transient VectorSchemaRoot rootWriter;

Review comment:
       private

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/ArrowSerializer.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils.serializers.python;
+
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+public abstract class ArrowSerializer<T> {

Review comment:
       What about move to package **org.apache.flink.table.runtime.arrow.serializers**?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
##########
@@ -157,6 +157,21 @@
 	 */
 	protected transient TypeSerializer<Row> forwardedInputSerializer;
 
+	/**
+	 *
+	 */
+	protected transient TypeSerializer<Row> inputTypeSerializer;
+
+	/**
+	 * Reusable OutputStream used to holding the serialized input elements.
+	 */
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	/**
+	 * OutputStream Wrapper.
+	 */
+	protected transient DataOutputViewStreamWrapper baosWrapper;

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356",
       "triggerID" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4365",
       "triggerID" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9eb4991ed1ecd766c0f866b7d9722f5972acdae9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356) 
   * e0d6a9f6454d2a0a53cf2830a04a57045777fd29 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4365) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299) 
   * 7165f8e65fa644d694e2b3ede275532098c25af2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356",
       "triggerID" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7165f8e65fa644d694e2b3ede275532098c25af2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325) 
   * 9eb4991ed1ecd766c0f866b7d9722f5972acdae9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8258612413edd24a2effe7b8cf8c7ccfb2c1caf6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654726188


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 8258612413edd24a2effe7b8cf8c7ccfb2c1caf6 (Tue Jul 07 09:31:44 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-18490).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #12841:
URL: https://github.com/apache/flink/pull/12841#discussion_r452034065



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.serializers;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+@Internal
+public abstract class ArrowSerializer<T> {
+
+	/**
+	 * The input RowType.
+	 */
+	protected final RowType inputType;
+
+	/**
+	 * The output RowType.
+	 */
+	protected final RowType outputType;
+
+	/**
+	 * Allocator which is used for byte buffer allocation.
+	 */
+	private transient BufferAllocator allocator;
+
+	/**
+	 * Reader which is responsible for deserialize the Arrow format data to the Flink rows.
+	 */
+	private transient ArrowReader<T> arrowReader;
+
+	/**
+	 * Reader which is responsible for convert the execution result from
+	 * byte array to arrow format.
+	 */
+	private transient ArrowStreamReader arrowStreamReader;
+
+	/**
+	 * Container that holds a set of vectors for the input elements
+	 * to be sent to the Python worker.
+	 */
+	transient VectorSchemaRoot rootWriter;
+
+	/**
+	 * Writer which is responsible for serialize the input elements to arrow format.
+	 */
+	private transient ArrowWriter<T> arrowWriter;
+
+	/**
+	 * Writer which is responsible for convert the arrow format data into byte array.
+	 */
+	private transient ArrowStreamWriter arrowStreamWriter;
+
+	public ArrowSerializer(
+		RowType inputType,
+		RowType outputType) {
+		this.inputType = inputType;
+		this.outputType = outputType;
+	}
+
+	public void open(InputStream bais, OutputStream baos) throws Exception {
+		allocator = ArrowUtils.getRootAllocator().newChildAllocator("allocator", 0, Long.MAX_VALUE);
+		arrowStreamReader = new ArrowStreamReader(bais, allocator);
+
+		rootWriter = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(inputType), allocator);
+		arrowWriter = createArrowWriter();
+		arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos);
+		arrowStreamWriter.start();
+	}
+
+	public int load() throws IOException {
+		arrowStreamReader.loadNextBatch();
+		VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
+		if (arrowReader == null) {
+			arrowReader = createArrowReader(root);
+		}
+		return root.getRowCount();
+	}
+
+	public T index(int i) {

Review comment:
       rename to **read**?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.serializers;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes.
+ *
+ * @param <T> type of the input elements.
+ */
+@Internal
+public abstract class ArrowSerializer<T> {
+
+	/**
+	 * The input RowType.
+	 */
+	protected final RowType inputType;
+
+	/**
+	 * The output RowType.
+	 */
+	protected final RowType outputType;
+
+	/**
+	 * Allocator which is used for byte buffer allocation.
+	 */
+	private transient BufferAllocator allocator;
+
+	/**
+	 * Reader which is responsible for deserialize the Arrow format data to the Flink rows.
+	 */
+	private transient ArrowReader<T> arrowReader;
+
+	/**
+	 * Reader which is responsible for convert the execution result from
+	 * byte array to arrow format.
+	 */
+	private transient ArrowStreamReader arrowStreamReader;
+
+	/**
+	 * Container that holds a set of vectors for the input elements
+	 * to be sent to the Python worker.
+	 */
+	transient VectorSchemaRoot rootWriter;
+
+	/**
+	 * Writer which is responsible for serialize the input elements to arrow format.
+	 */
+	private transient ArrowWriter<T> arrowWriter;
+
+	/**
+	 * Writer which is responsible for convert the arrow format data into byte array.
+	 */
+	private transient ArrowStreamWriter arrowStreamWriter;
+
+	public ArrowSerializer(
+		RowType inputType,
+		RowType outputType) {
+		this.inputType = inputType;
+		this.outputType = outputType;
+	}
+
+	public void open(InputStream bais, OutputStream baos) throws Exception {
+		allocator = ArrowUtils.getRootAllocator().newChildAllocator("allocator", 0, Long.MAX_VALUE);
+		arrowStreamReader = new ArrowStreamReader(bais, allocator);
+
+		rootWriter = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(inputType), allocator);
+		arrowWriter = createArrowWriter();
+		arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos);
+		arrowStreamWriter.start();
+	}
+
+	public int load() throws IOException {
+		arrowStreamReader.loadNextBatch();
+		VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
+		if (arrowReader == null) {
+			arrowReader = createArrowReader(root);
+		}
+		return root.getRowCount();
+	}
+
+	public T index(int i) {
+		return arrowReader.read(i);
+	}
+
+	public void dump(T element) {

Review comment:
       rename to **write**?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/RowDataArrowPythonScalarFunctionOperator.java
##########
@@ -47,21 +39,23 @@
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * Allocator which is used for byte buffer allocation.
-	 */
-	private transient BufferAllocator allocator;
+	private static final String SCHEMA_ARROW_CODER_URN = "flink:coder:schema:scalar_function:arrow:v1";
+
+	static {

Review comment:
       Move to **ArrowSerializer**?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -160,10 +190,17 @@ public void processElement(StreamRecord<IN> element) throws Exception {
 
 	public abstract UDFIN getFunctionInput(IN element);
 
-	public abstract PythonFunctionRunner<UDFIN> createPythonFunctionRunner(
-		FnDataReceiver<byte[]> resultReceiver,
-		PythonEnvironmentManager pythonEnvironmentManager,
-		Map<String, String> jobOptions);
+	/**
+	 * Gets the proto representation of the Python user-defined functions to be executed.
+	 */
+	@VisibleForTesting

Review comment:
       remove **VisibleForTesting**




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7165f8e65fa644d694e2b3ede275532098c25af2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325) 
   * 9eb4991ed1ecd766c0f866b7d9722f5972acdae9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8258612413edd24a2effe7b8cf8c7ccfb2c1caf6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298) 
   * e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356",
       "triggerID" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7165f8e65fa644d694e2b3ede275532098c25af2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325) 
   * 9eb4991ed1ecd766c0f866b7d9722f5972acdae9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356) 
   * e0d6a9f6454d2a0a53cf2830a04a57045777fd29 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu closed pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #12841:
URL: https://github.com/apache/flink/pull/12841


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8258612413edd24a2effe7b8cf8c7ccfb2c1caf6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298) 
   * e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356",
       "triggerID" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4365",
       "triggerID" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7165f8e65fa644d694e2b3ede275532098c25af2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325) 
   * 9eb4991ed1ecd766c0f866b7d9722f5972acdae9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356) 
   * e0d6a9f6454d2a0a53cf2830a04a57045777fd29 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4365) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4299",
       "triggerID" : "e86b46c6e9bab96d321a2cc7e245f57cb3a8a78b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4325",
       "triggerID" : "7165f8e65fa644d694e2b3ede275532098c25af2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4356",
       "triggerID" : "9eb4991ed1ecd766c0f866b7d9722f5972acdae9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4365",
       "triggerID" : "e0d6a9f6454d2a0a53cf2830a04a57045777fd29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e0d6a9f6454d2a0a53cf2830a04a57045777fd29 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4365) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12841:
URL: https://github.com/apache/flink/pull/12841#issuecomment-654737071


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298",
       "triggerID" : "8258612413edd24a2effe7b8cf8c7ccfb2c1caf6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8258612413edd24a2effe7b8cf8c7ccfb2c1caf6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4298) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #12841:
URL: https://github.com/apache/flink/pull/12841#discussion_r451255412



##########
File path: flink-python/src/main/java/org/apache/flink/python/AsyncPythonFunctionRunner.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The basic interface of Runner is responsible for asynchronous execution of Python functions.

Review comment:
       nip: Runner -> runner which

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -320,23 +322,22 @@ private void checkInvokeFinishBundleByCount() throws Exception {
 	 */
 	private void checkInvokeFinishBundleByTime() throws Exception {
 		long now = getProcessingTimeService().getCurrentProcessingTime();
-		if (now - lastFinishBundleTime >= maxBundleTimeMills) {
+		if (now - lastFinishBundleTime >= maxBundleTimeMills && elementCount > 0) {
 			invokeFinishBundle();
 		}
 	}
 
-	private void invokeFinishBundle() throws Exception {
-		if (bundleStarted.compareAndSet(true, false)) {
-			pythonFunctionRunner.finishBundle();
-
-			emitResults();
-			elementCount = 0;
-			lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
-			// callback only after current bundle was fully finalized
-			if (bundleFinishedCallback != null) {
-				bundleFinishedCallback.run();
-				bundleFinishedCallback = null;
-			}
+	protected void invokeFinishBundle() throws Exception {

Review comment:
       Originally, the flush logic is handled inside the lock `bundleStarted`. Why the lock is removed?

##########
File path: flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java
##########
@@ -19,42 +19,37 @@
 package org.apache.flink.python;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
  * The base interface of runner which is responsible for the execution of Python functions.
- *
- * @param <IN> Type of the input elements.
  */
 @Internal
-public interface PythonFunctionRunner<IN> {
+public interface PythonFunctionRunner {
 
 	/**
 	 * Prepares the Python function runner, such as preparing the Python execution environment, etc.
 	 */
-	void open() throws Exception;
+	void open(PythonConfig config) throws Exception;
 
 	/**
 	 * Tear-down the Python function runner.
 	 */
 	void close() throws Exception;
 
 	/**
-	 * Prepares to process the next bundle of elements.
-	 */
-	void startBundle() throws Exception;
-
-	/**
-	 * Forces to finish the processing of the current bundle of elements. It will flush
-	 * the data cached in the data buffer for processing and retrieves the state mutations (if exists)
-	 * made by the Python function. The call blocks until all of the outputs produced by this
-	 * bundle have been received.
+	 * Executes the Python function with the input byte array.
+	 *
+	 * @param data the byte array data.
 	 */
-	void finishBundle() throws Exception;
+	void process(byte[] data) throws Exception;
 
 	/**
-	 * Executes the Python function with the input element. It's not required to execute
-	 * the Python function immediately. The runner may choose to buffer the input element and
-	 * execute them in batch for better performance.
+	 * Retrieves and removes the Python function result.

Review comment:
       Remove the words **and removes**? I think it's up to the implementation of the PythonFunctionRunner. What do you think? 

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/Environment.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The base interface of python environment for executing UDFs.
+ */
+@Internal
+public interface Environment {

Review comment:
       There is also an **Environment** class in flink-runtime. What about rename this class to **PythonEnvironment** or another name?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -106,6 +106,21 @@
 	 */
 	protected transient DataInputViewStreamWrapper baisWrapper;
 
+	/**
+	 * Reusable OutputStream used to holding the serialized input elements.
+	 */
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	/**
+	 * OutputStream Wrapper.
+	 */
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	/**
+	 * The TypeSerializer for input elements.
+	 */
+	protected transient TypeSerializer<UDFIN> inputTypeSerializer;

Review comment:
       ditto

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -120,36 +135,68 @@ public AbstractStatelessFunctionOperator(
 
 	@Override
 	public void open() throws Exception {
-		forwardedInputQueue = new LinkedBlockingQueue<>();
-		userDefinedFunctionResultQueue = new LinkedBlockingQueue<>();
+		forwardedInputQueue = new LinkedList<>();
 		userDefinedFunctionInputType = new RowType(
 			Arrays.stream(userDefinedFunctionInputOffsets)
 				.mapToObj(i -> inputType.getFields().get(i))
 				.collect(Collectors.toList()));
 		bais = new ByteArrayInputStreamWithPos();
 		baisWrapper = new DataInputViewStreamWrapper(bais);
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		inputTypeSerializer = getInputTypeSerializer();
 		super.open();
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		bufferInput(element.getValue());
-		super.processElement(element);
+		IN value = element.getValue();
+		bufferInput(value);
+		inputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
+		pythonFunctionRunner.process(baos.toByteArray());
 		emitResults();
+		baos.reset();
 	}
 
 	@Override
-	public PythonFunctionRunner<IN> createPythonFunctionRunner() throws IOException {
-		final FnDataReceiver<byte[]> userDefinedFunctionResultReceiver = input -> {
-			// handover to queue, do not block the result receiver thread
-			userDefinedFunctionResultQueue.put(input);
-		};
-
-		return new ProjectUdfInputPythonScalarFunctionRunner(
-			createPythonFunctionRunner(
-				userDefinedFunctionResultReceiver,
-				createPythonEnvironmentManager(),
-				jobOptions));
+	public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
+		return new BeamPythonStatelessFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			createPythonEnvironmentManager(),
+			userDefinedFunctionInputType,
+			userDefinedFunctionOutputType,
+			getFunctionUrn(),
+			getUserDefinedFunctionsProto(),
+			getInputOutputCoderUrn(),
+			jobOptions,
+			getFlinkMetricContainer());
+	}
+
+	protected void emitResults() throws Exception {
+		if (this.isAsyncPythonFunctionRunner) {
+			checkInvokeFinishBundleByCount();

Review comment:
       Currently, for the async runner, emitResult will only be called when finishBundle. However, it's not the case previously.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -106,6 +106,21 @@
 	 */
 	protected transient DataInputViewStreamWrapper baisWrapper;
 
+	/**
+	 * Reusable OutputStream used to holding the serialized input elements.
+	 */
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	/**
+	 * OutputStream Wrapper.
+	 */
+	protected transient DataOutputViewStreamWrapper baosWrapper;

Review comment:
       can be private




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org