You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/08 06:59:18 UTC

[GitHub] [flink] dianfu commented on a change in pull request #12841: [FLINK-18490][python] Extract the implementation logic of Beam in AbstractPythonFunctionOperator

dianfu commented on a change in pull request #12841:
URL: https://github.com/apache/flink/pull/12841#discussion_r451255412



##########
File path: flink-python/src/main/java/org/apache/flink/python/AsyncPythonFunctionRunner.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The basic interface of Runner is responsible for asynchronous execution of Python functions.

Review comment:
       nip: Runner -> runner which

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -320,23 +322,22 @@ private void checkInvokeFinishBundleByCount() throws Exception {
 	 */
 	private void checkInvokeFinishBundleByTime() throws Exception {
 		long now = getProcessingTimeService().getCurrentProcessingTime();
-		if (now - lastFinishBundleTime >= maxBundleTimeMills) {
+		if (now - lastFinishBundleTime >= maxBundleTimeMills && elementCount > 0) {
 			invokeFinishBundle();
 		}
 	}
 
-	private void invokeFinishBundle() throws Exception {
-		if (bundleStarted.compareAndSet(true, false)) {
-			pythonFunctionRunner.finishBundle();
-
-			emitResults();
-			elementCount = 0;
-			lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
-			// callback only after current bundle was fully finalized
-			if (bundleFinishedCallback != null) {
-				bundleFinishedCallback.run();
-				bundleFinishedCallback = null;
-			}
+	protected void invokeFinishBundle() throws Exception {

Review comment:
       Originally, the flush logic is handled inside the lock `bundleStarted`. Why the lock is removed?

##########
File path: flink-python/src/main/java/org/apache/flink/python/PythonFunctionRunner.java
##########
@@ -19,42 +19,37 @@
 package org.apache.flink.python;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
  * The base interface of runner which is responsible for the execution of Python functions.
- *
- * @param <IN> Type of the input elements.
  */
 @Internal
-public interface PythonFunctionRunner<IN> {
+public interface PythonFunctionRunner {
 
 	/**
 	 * Prepares the Python function runner, such as preparing the Python execution environment, etc.
 	 */
-	void open() throws Exception;
+	void open(PythonConfig config) throws Exception;
 
 	/**
 	 * Tear-down the Python function runner.
 	 */
 	void close() throws Exception;
 
 	/**
-	 * Prepares to process the next bundle of elements.
-	 */
-	void startBundle() throws Exception;
-
-	/**
-	 * Forces to finish the processing of the current bundle of elements. It will flush
-	 * the data cached in the data buffer for processing and retrieves the state mutations (if exists)
-	 * made by the Python function. The call blocks until all of the outputs produced by this
-	 * bundle have been received.
+	 * Executes the Python function with the input byte array.
+	 *
+	 * @param data the byte array data.
 	 */
-	void finishBundle() throws Exception;
+	void process(byte[] data) throws Exception;
 
 	/**
-	 * Executes the Python function with the input element. It's not required to execute
-	 * the Python function immediately. The runner may choose to buffer the input element and
-	 * execute them in batch for better performance.
+	 * Retrieves and removes the Python function result.

Review comment:
       Remove the words **and removes**? I think it's up to the implementation of the PythonFunctionRunner. What do you think? 

##########
File path: flink-python/src/main/java/org/apache/flink/python/env/Environment.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.env;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The base interface of python environment for executing UDFs.
+ */
+@Internal
+public interface Environment {

Review comment:
       There is also an **Environment** class in flink-runtime. What about rename this class to **PythonEnvironment** or another name?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -106,6 +106,21 @@
 	 */
 	protected transient DataInputViewStreamWrapper baisWrapper;
 
+	/**
+	 * Reusable OutputStream used to holding the serialized input elements.
+	 */
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	/**
+	 * OutputStream Wrapper.
+	 */
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	/**
+	 * The TypeSerializer for input elements.
+	 */
+	protected transient TypeSerializer<UDFIN> inputTypeSerializer;

Review comment:
       ditto

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -120,36 +135,68 @@ public AbstractStatelessFunctionOperator(
 
 	@Override
 	public void open() throws Exception {
-		forwardedInputQueue = new LinkedBlockingQueue<>();
-		userDefinedFunctionResultQueue = new LinkedBlockingQueue<>();
+		forwardedInputQueue = new LinkedList<>();
 		userDefinedFunctionInputType = new RowType(
 			Arrays.stream(userDefinedFunctionInputOffsets)
 				.mapToObj(i -> inputType.getFields().get(i))
 				.collect(Collectors.toList()));
 		bais = new ByteArrayInputStreamWithPos();
 		baisWrapper = new DataInputViewStreamWrapper(bais);
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		inputTypeSerializer = getInputTypeSerializer();
 		super.open();
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		bufferInput(element.getValue());
-		super.processElement(element);
+		IN value = element.getValue();
+		bufferInput(value);
+		inputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
+		pythonFunctionRunner.process(baos.toByteArray());
 		emitResults();
+		baos.reset();
 	}
 
 	@Override
-	public PythonFunctionRunner<IN> createPythonFunctionRunner() throws IOException {
-		final FnDataReceiver<byte[]> userDefinedFunctionResultReceiver = input -> {
-			// handover to queue, do not block the result receiver thread
-			userDefinedFunctionResultQueue.put(input);
-		};
-
-		return new ProjectUdfInputPythonScalarFunctionRunner(
-			createPythonFunctionRunner(
-				userDefinedFunctionResultReceiver,
-				createPythonEnvironmentManager(),
-				jobOptions));
+	public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
+		return new BeamPythonStatelessFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			createPythonEnvironmentManager(),
+			userDefinedFunctionInputType,
+			userDefinedFunctionOutputType,
+			getFunctionUrn(),
+			getUserDefinedFunctionsProto(),
+			getInputOutputCoderUrn(),
+			jobOptions,
+			getFlinkMetricContainer());
+	}
+
+	protected void emitResults() throws Exception {
+		if (this.isAsyncPythonFunctionRunner) {
+			checkInvokeFinishBundleByCount();

Review comment:
       Currently, for the async runner, emitResult will only be called when finishBundle. However, it's not the case previously.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -106,6 +106,21 @@
 	 */
 	protected transient DataInputViewStreamWrapper baisWrapper;
 
+	/**
+	 * Reusable OutputStream used to holding the serialized input elements.
+	 */
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	/**
+	 * OutputStream Wrapper.
+	 */
+	protected transient DataOutputViewStreamWrapper baosWrapper;

Review comment:
       can be private




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org