You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "e-galan (via GitHub)" <gi...@apache.org> on 2024/04/15 07:44:20 UTC

[PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

e-galan opened a new pull request, #39018:
URL: https://github.com/apache/airflow/pull/39018

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   - Adds new methods to `DataflowHook` to be used for deferrable mode. The methods will start a dataflow job, and then exit returning the job data.
   - Add statements to push job_id to XCOM in both sync and deferrable runs.
   - Update unit tests
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "e-galan (via GitHub)" <gi...@apache.org>.
e-galan commented on PR #39018:
URL: https://github.com/apache/airflow/pull/39018#issuecomment-2063502941

   Hi @eladkal @Lee-W @potiuk !
   Could you take a look at the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W merged PR #39018:
URL: https://github.com/apache/airflow/pull/39018


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "e-galan (via GitHub)" <gi...@apache.org>.
e-galan commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1577409161


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -736,6 +736,69 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @_fallback_to_location_from_variables
+    @_fallback_to_project_id_from_variables
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_job_with_template(

Review Comment:
   @Lee-W I was thinking about it, but because of `name = self.build_dataflow_job_name(job_name, append_job_name)`  on line 684, I can't do that without changing the original method's parameters somehow. Either that, or `launch_job_with_template` method will need to return both the _job name_ and _job response_,  and we only need the _job response_ for the deferrable mode. So it won't be exactly clean.
   
   However if the goal is to minimize code duplication, there is one block that could be extracted into a separate method:
   
        ` service: Resource = self.get_conn()
   
           request = (
               service.projects()
               .locations()
               .templates()
               .launch(
                   projectId=project_id,
                   location=location,
                   gcsPath=dataflow_template,
                   body={
                       "jobName": name,
                       "parameters": parameters,
                       "environment": environment,
                   },
               )
           )
           response = request.execute(num_retries=self.num_retries)
   
           job = response["job"]`
   
   It can be used in `start_template_dataflow` and `launch_job_with_template` methods.
   
   Does it make sense to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "e-galan (via GitHub)" <gi...@apache.org>.
e-galan commented on PR #39018:
URL: https://github.com/apache/airflow/pull/39018#issuecomment-2068922597

   > Could you please rebase and resolve conflict as well @e-galan ? 0
   Sure @potiuk , PR is now rebased and the conflict resolved.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "e-galan (via GitHub)" <gi...@apache.org>.
e-galan commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1576404398


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -736,6 +736,69 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @_fallback_to_location_from_variables
+    @_fallback_to_project_id_from_variables
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_job_with_template(

Review Comment:
   @Lee-W Yes, it was based on the `start_template_dataflow` function().  In my opinion the `start_template_dataflow()` has too much going on inside of it. There are two conditional callback function parameters, one of them deprecated,  as well as the `jobs_controller` object, all of which are tied together and not needed for the deferrable mode. 
   
   So I thought that instead of adding even more parameters and, by extension, additional ifs into the existing method, it would be better to write a separate hook method for the deferrable mode. That way the code that uses the existing method would not be affected, and it would be easier for us to maintain the operators in the future.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "e-galan (via GitHub)" <gi...@apache.org>.
e-galan commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1579071214


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -736,6 +736,69 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @_fallback_to_location_from_variables
+    @_fallback_to_project_id_from_variables
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_job_with_template(

Review Comment:
   @Lee-W Updated. Please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1577492150


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -736,6 +736,69 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @_fallback_to_location_from_variables
+    @_fallback_to_project_id_from_variables
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_job_with_template(

Review Comment:
   Yep, sounds great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "e-galan (via GitHub)" <gi...@apache.org>.
e-galan commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1580752703


##########
tests/providers/google/cloud/operators/test_dataflow.py:
##########
@@ -540,8 +559,9 @@ def test_validation_deferrable_params_raises_error(self):
             DataflowTemplatedJobStartOperator(**init_kwargs)
 
     @pytest.mark.db_test
-    @mock.patch("airflow.providers.google.cloud.operators.dataflow.DataflowHook.start_template_dataflow")
-    def test_start_with_custom_region(self, dataflow_mock):
+    @mock.patch(f"{DATAFLOW_PATH}.DataflowTemplatedJobStartOperator.xcom_push")
+    @mock.patch(f"{DATAFLOW_PATH}.DataflowHook.start_template_dataflow")
+    def test_start_with_custom_region(self, dataflow_mock, mock_xcom_push):

Review Comment:
   I did not think that we should assert if every mock was called or not, and added it just to make the test pass. Since we're not testing xcom_push anywhere else, I think I could add the call assert. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "e-galan (via GitHub)" <gi...@apache.org>.
e-galan closed pull request #39018: Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator
URL: https://github.com/apache/airflow/pull/39018


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1580693521


##########
tests/providers/google/cloud/operators/test_dataflow.py:
##########
@@ -540,8 +559,9 @@ def test_validation_deferrable_params_raises_error(self):
             DataflowTemplatedJobStartOperator(**init_kwargs)
 
     @pytest.mark.db_test
-    @mock.patch("airflow.providers.google.cloud.operators.dataflow.DataflowHook.start_template_dataflow")
-    def test_start_with_custom_region(self, dataflow_mock):
+    @mock.patch(f"{DATAFLOW_PATH}.DataflowTemplatedJobStartOperator.xcom_push")
+    @mock.patch(f"{DATAFLOW_PATH}.DataflowHook.start_template_dataflow")
+    def test_start_with_custom_region(self, dataflow_mock, mock_xcom_push):

Review Comment:
   We're mocking xcom_push here. Should we assert it's called or not?



##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -736,6 +736,69 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @_fallback_to_location_from_variables
+    @_fallback_to_project_id_from_variables
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_job_with_template(

Review Comment:
   This is great refactoring!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1577152401


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -736,6 +736,69 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @_fallback_to_location_from_variables
+    @_fallback_to_project_id_from_variables
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_job_with_template(

Review Comment:
   yep, it does make sense. or should we reuse the logic from `launch_job_with_template` instead? something like
   
   ```python
   def start_template_dataflow(...):
       self.launch_job_with_template()
   
       # rest of the logic
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #39018:
URL: https://github.com/apache/airflow/pull/39018#issuecomment-2068855264

   Could you please rebase and resolve conflict as well @e-galan ? 0 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #39018:
URL: https://github.com/apache/airflow/pull/39018#discussion_r1576315441


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -736,6 +736,69 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @_fallback_to_location_from_variables
+    @_fallback_to_project_id_from_variables
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_job_with_template(

Review Comment:
   Looks like this function could be part of `start_template_dataflow`. Should we introduce a flag to `start_template_dataflow` to decide whether we want to wait?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Fix deferrable mode for DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #39018:
URL: https://github.com/apache/airflow/pull/39018#issuecomment-2070996041

   Don't know all the details but it seems good for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org