You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/30 08:45:26 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

HuangXingBo opened a new pull request #14524:
URL: https://github.com/apache/flink/pull/14524


   ## What is the purpose of the change
   
   *This pull request will Support minibatch to optimize Python UDAF*
   
   
   ## Brief change log
   
     - *Support minibatch to optimize Python UDAF*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *original tests*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   
   class MeanAggregateFunction(AggregateFunction):
   
       def get_value(self, accumulator: ACC) -> T:
           if accumulator[1] == 0:
               return None
           else:
               return accumulator[0] / accumulator[1]
   
       def create_accumulator(self) -> ACC:
           return [0, 0]
   
       def accumulate(self, accumulator: ACC, *args):
           accumulator[0] += args[0]
           accumulator[1] += 1
   
       def retract(self, accumulator: ACC, *args):
           accumulator[0] -= args[0]
           accumulator[1] -= 1
   
       def merge(self, accumulator: ACC, accumulators):
           for other_acc in accumulators:
               accumulator[0] += other_acc[0]
               accumulator[1] += other_acc[1]
   
       def get_accumulator_type(self) -> DataType:
           return DataTypes.ARRAY(DataTypes.BIGINT())
   
       def get_result_type(self) -> DataType:
           return DataTypes.FLOAT()
   
   ### Test Code
       env = StreamExecutionEnvironment.get_execution_environment()
       env.set_parallelism(1)
       env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
       environment_settings = EnvironmentSettings.new_instance().use_blink_planner().build()
       t_env = StreamTableEnvironment.create(env, environment_settings=environment_settings)
       t_env.get_config().get_configuration().set_integer("python.fn-execution.bundle.time", 1000) 
        t_env.get_config().get_configuration().set_boolean("pipeline.object-reuse", True)
   
       t_env.create_temporary_function("python_avg", MeanAggregateFunction())
       t_env.create_java_temporary_system_function("java_avg", "com.alibaba.flink.function.JavaAvg")
   
       num_rows = 10000000
   
       t_env.execute_sql(f"""
           CREATE TABLE source (
               id INT,
               num INT,
               rowtime TIMESTAMP(3),
               WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
           ) WITH (
             'connector' = 'Range',
             'start' = '1',
             'end' = '{num_rows}',
             'step' = '1',
             'partition' = '200'
           )
       """)
       t_env.register_table_sink(
           "sink",
           PrintTableSink(
               ["value"],
               [DataTypes.FLOAT(False)], 1000000))
       result = t_env.from_path("source") \
           .select("python_avg(id)")
       result.insert_into("sink")
       beg_time = time.time()
       t_env.execute("Python UDF")
       print("PyFlink stream group agg consume time: " + str(time.time() - beg_time))
   
   
   ## Test Results
   num rows, num colums |  Consume Time(Before) | Consume Time(After)
   1000w,3                      |     95.40s                         |    54.85s
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752383882


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488",
       "triggerID" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e8975d6f66798fc42dca329b2019782367dc6686",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e8975d6f66798fc42dca329b2019782367dc6686",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87f9a7aac8cbd730781b7cdcbacaa30b76f506fe Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488) 
   * e8975d6f66798fc42dca329b2019782367dc6686 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752383882


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488",
       "triggerID" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87f9a7aac8cbd730781b7cdcbacaa30b76f506fe Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752383882


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488",
       "triggerID" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e8975d6f66798fc42dca329b2019782367dc6686",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11531",
       "triggerID" : "e8975d6f66798fc42dca329b2019782367dc6686",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87f9a7aac8cbd730781b7cdcbacaa30b76f506fe Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488) 
   * e8975d6f66798fc42dca329b2019782367dc6686 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11531) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752383882


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87f9a7aac8cbd730781b7cdcbacaa30b76f506fe UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752378157


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit e8975d6f66798fc42dca329b2019782367dc6686 (Fri May 28 06:59:20 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752383882


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488",
       "triggerID" : "87f9a7aac8cbd730781b7cdcbacaa30b76f506fe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 87f9a7aac8cbd730781b7cdcbacaa30b76f506fe Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11488) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] WeiZhong94 commented on a change in pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
WeiZhong94 commented on a change in pull request #14524:
URL: https://github.com/apache/flink/pull/14524#discussion_r550390434



##########
File path: flink-python/pyflink/fn_execution/aggregate_fast.pyx
##########
@@ -564,50 +583,68 @@ cdef class GroupTableAggFunction(GroupAggFunctionBase):
             aggs_handle, key_selector, state_backend, state_value_coder, generate_update_before,
             state_cleaning_enabled, index_of_count_star)
 
-    cpdef list process_element(self, InternalRow input_data):
+    cpdef list finish_bundle(self):
         cdef bint first_row
         cdef list key, accumulators, input_value, results
         cdef SimpleTableAggsHandleFunction aggs_handle
         cdef InternalRowKind input_row_kind
+        cdef tuple current_key
+        cdef InternalRow input_data
+        cdef size_t start_index, i, input_rows_num
+        cdef object state_backend, accumulator_state
         results = []
-        input_value = input_data.values
-        input_row_kind = input_data.row_kind
+        # input_value = input_data.values

Review comment:
       Unnecessary comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752378157


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 87f9a7aac8cbd730781b7cdcbacaa30b76f506fe (Wed Dec 30 08:48:05 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20769).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752873339


   @WeiZhong94 Thanks a lot for the review. I have addressed the comment at the latest commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu closed pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #14524:
URL: https://github.com/apache/flink/pull/14524


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on pull request #14524: [FLINK-20769][python] Support minibatch to optimize Python UDAF

Posted by GitBox <gi...@apache.org>.
dianfu commented on pull request #14524:
URL: https://github.com/apache/flink/pull/14524#issuecomment-752913645


   Thanks @HuangXingBo for this great work and thanks @WeiZhong94 for the review. Have merged the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org