You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/30 08:07:34 UTC

[GitHub] [flink] HuangXingBo opened a new pull request, #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

HuangXingBo opened a new pull request, #20396:
URL: https://github.com/apache/flink/pull/20396

   ## What is the purpose of the change
   
   *This pull request will support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode*
   
   ## Brief change log
   
     - *Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *`test_keyed_co_process` and `test_basic_co_operations` in `test_data_stream.py`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20396:
URL: https://github.com/apache/flink/pull/20396#discussion_r934132299


##########
flink-python/pyflink/fn_execution/embedded/operations.py:
##########
@@ -59,17 +66,60 @@ def __init__(self,
                 timer_context,
                 job_parameters)
                 for serialized_fn in serialized_fns])
-        super(OneInputFunctionOperation, self).__init__(
-            operations, input_data_converter, output_data_converter)
+        super(OneInputFunctionOperation, self).__init__(operations, output_data_converter)
+        self._input_data_converter = input_data_converter
 
     def process_element(self, value):
-        def _process_element(op, elements):
-            for element in elements:
-                yield from op.process_element(element)
+        results = self._main_operation.process_element(
+            self._input_data_converter.to_internal(value))
 
-        value = [self._input_data_converter.to_internal(value)]
-        for operation in self._operations:
-            value = _process_element(operation, value)
+        for operation in self._chained_operations:
+            results = self._process_element(operation, results)
+
+        for item in results:

Review Comment:
   ditto



##########
flink-python/pyflink/fn_execution/embedded/operations.py:
##########
@@ -38,8 +38,15 @@ def close(self):
 
     def on_timer(self, timestamp):
         for operation in self._operations:
-            for item in operation.on_timer(timestamp):
-                yield self._output_data_converter.to_external(item)
+            results = operation.on_timer(timestamp)

Review Comment:
   Results should feed into following operations?



##########
flink-python/pyflink/fn_execution/embedded/operations.py:
##########
@@ -59,17 +66,60 @@ def __init__(self,
                 timer_context,
                 job_parameters)
                 for serialized_fn in serialized_fns])
-        super(OneInputFunctionOperation, self).__init__(
-            operations, input_data_converter, output_data_converter)
+        super(OneInputFunctionOperation, self).__init__(operations, output_data_converter)
+        self._input_data_converter = input_data_converter
 
     def process_element(self, value):
-        def _process_element(op, elements):
-            for element in elements:
-                yield from op.process_element(element)
+        results = self._main_operation.process_element(
+            self._input_data_converter.to_internal(value))
 
-        value = [self._input_data_converter.to_internal(value)]
-        for operation in self._operations:
-            value = _process_element(operation, value)
+        for operation in self._chained_operations:
+            results = self._process_element(operation, results)

Review Comment:
   Should check None?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #20396:
URL: https://github.com/apache/flink/pull/20396#discussion_r934341140


##########
flink-python/pyflink/fn_execution/embedded/operations.py:
##########
@@ -59,17 +66,60 @@ def __init__(self,
                 timer_context,
                 job_parameters)
                 for serialized_fn in serialized_fns])
-        super(OneInputFunctionOperation, self).__init__(
-            operations, input_data_converter, output_data_converter)
+        super(OneInputFunctionOperation, self).__init__(operations, output_data_converter)
+        self._input_data_converter = input_data_converter
 
     def process_element(self, value):
-        def _process_element(op, elements):
-            for element in elements:
-                yield from op.process_element(element)
+        results = self._main_operation.process_element(
+            self._input_data_converter.to_internal(value))
 
-        value = [self._input_data_converter.to_internal(value)]
-        for operation in self._operations:
-            value = _process_element(operation, value)
+        for operation in self._chained_operations:
+            results = self._process_element(operation, results)

Review Comment:
   Yes. Good catch



##########
flink-python/pyflink/fn_execution/embedded/operations.py:
##########
@@ -38,8 +38,15 @@ def close(self):
 
     def on_timer(self, timestamp):
         for operation in self._operations:
-            for item in operation.on_timer(timestamp):
-                yield self._output_data_converter.to_external(item)
+            results = operation.on_timer(timestamp)

Review Comment:
   Yes. Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on PR #20396:
URL: https://github.com/apache/flink/pull/20396#issuecomment-1200393104

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20396:
URL: https://github.com/apache/flink/pull/20396#discussion_r935187966


##########
flink-python/pyflink/fn_execution/embedded/operations.py:
##########
@@ -37,8 +37,28 @@ def close(self):
             operation.close()
 
     def on_timer(self, timestamp):
-        for operation in self._operations:
-            for item in operation.on_timer(timestamp):
+        results = self._main_operation.on_timer(timestamp)
+
+        if results:
+            results = self._process_elements(results)
+
+        if results:
+            yield from self._output_elements(results)
+
+    def _process_elements(self, elements):
+        def _process_elements_on_operation(op, items):
+            if items:
+                for item in items:
+                    yield from op.process_element(item)

Review Comment:
   Still need to check if the process_element returns None?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20396:
URL: https://github.com/apache/flink/pull/20396#issuecomment-1200115276

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "650e7b2a693a8f6989f6ad20e17ceca5ef6a8359",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "650e7b2a693a8f6989f6ad20e17ceca5ef6a8359",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 650e7b2a693a8f6989f6ad20e17ceca5ef6a8359 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on PR #20396:
URL: https://github.com/apache/flink/pull/20396#issuecomment-1200393131

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo closed pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode

Posted by GitBox <gi...@apache.org>.
HuangXingBo closed pull request #20396: [FLINK-28745][python] Support DataStream PythonCoProcessOperator and PythonKeyedCoProcessOperator in Thread Mode
URL: https://github.com/apache/flink/pull/20396


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org