You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/22 22:10:35 UTC

[GitHub] [airflow] megan-parker opened a new pull request #19758: Api add support bulk pause resume

megan-parker opened a new pull request #19758:
URL: https://github.com/apache/airflow/pull/19758


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   Currently the Airflow REST API only allows us to pause/resume (update) one DAG per request. We often need to pause/resume DAGs in bulk. To avoid maintaining a separate set of custom endpoints that will be fragile, it makes sense to update the `patch_dag` endpoint to handle bulk requests.
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r756703569



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -77,7 +77,7 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
         cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
         dags_query = dags_query.filter(or_(*cond))
 
-    total_entries = len(dags_query.all())
+    total_entries = dags_query.count()

Review comment:
       I don’t think this is an issue here because `dags_query` does not have things like JOIN and shouldn’t need to be deduplicated. And even if it is, we arguably _should_ add deduplication logic to `dags_query` (otherwise offset and limit won’t work right), which will make `count()` correct in the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#issuecomment-1066506678


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r821882513



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -100,25 +100,67 @@ def get_dags(
 @provide_session
 def patch_dag(*, dag_id: str, update_mask: UpdateMask = None, session: Session = NEW_SESSION) -> APIResponse:
     """Update the specific DAG"""
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if update_mask != ['is_paused']:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        patch_body_[update_mask[0]] = patch_body[update_mask[0]]
+        patch_body = patch_body_
     dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none()
     if not dag:
         raise NotFound(f"Dag with id: '{dag_id}' not found")
+    dag.is_paused = patch_body['is_paused']
+    session.flush()
+    return dag_schema.dump(dag)
+
+
+@security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
     try:
         patch_body = dag_schema.load(request.json, session=session)
     except ValidationError as err:
-        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+        raise BadRequest(detail=str(err.messages))
     if update_mask:
         patch_body_ = {}
-        if len(update_mask) > 1:
+        if update_mask != ['is_paused']:
             raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
         update_mask = update_mask[0]
-        if update_mask != 'is_paused':
-            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
         patch_body_[update_mask] = patch_body[update_mask]
         patch_body = patch_body_
-    setattr(dag, 'is_paused', patch_body['is_paused'])
-    session.commit()
-    return dag_schema.dump(dag)
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern == '~':
+        dag_id_pattern = '%'

Review comment:
       `dag_id_pattern` should never be `None` since it is a required parameter. 
   When I pass `None` I get the following response: `Dag with id: ''None'' not found`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r764265801



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,38 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only return active DAGs.
-
-            *New in version 2.1.1*
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only return DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.3.0*
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'

Review comment:
       So long as it's documented, sounds good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755338257



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -86,10 +86,13 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 
 @security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
 @provide_session
-def patch_dag(session, dag_id, update_mask=None):
+def patch_dag(session, dag_id, update_mask=None, multi=False):

Review comment:
       Agreed, an argument to filter the list endpoint sounds like a good thing to have on its own.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755582594



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -108,6 +108,48 @@ def patch_dag(session, dag_id, update_mask=None):
     session.commit()
     return dag_schema.dump(dag)
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+
+    dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
+    if tags:
+        cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
+        dags_query = dags_query.filter(or_(*cond))
+
+    total_entries = len(dags_query.all())
+
+    dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
+
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if len(update_mask) > 1:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        if update_mask != 'is_paused':
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        patch_body_[update_mask] = patch_body[update_mask]
+        patch_body = patch_body_
+    for dag in dags:

Review comment:
       Oh - maybe we can just fetch the list of DAGs to-be-paused and then mass-update them by filtering on the set?
   
   ```python
   dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
   
   dags_to_pause = {dag.dag_id for dag in dags}
   
   # and then later:
   session.query(DagModel).filter(DagModel.dag_id.in_(dags_to_pause)).update({DagModel.is_paused: is_paused}, synchronize_session='fetch')
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755363804



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -441,6 +441,46 @@ paths:
         '401':
           $ref: '#/components/responses/Unauthenticated'
 
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      tags: [DAG]
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/OrderBy'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - name: only_active
+          in: query
+          schema:
+            type: boolean
+            default: true
+          required: false
+          description: |
+            Only return active DAGs.
+
+            *New in version 2.1.1*

Review comment:
       ```suggestion
               Only update active DAGs.
   ```
   

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -441,6 +441,46 @@ paths:
         '401':
           $ref: '#/components/responses/Unauthenticated'
 
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      tags: [DAG]
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/OrderBy'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - name: only_active
+          in: query
+          schema:
+            type: boolean
+            default: true
+          required: false
+          description: |
+            Only return active DAGs.
+
+            *New in version 2.1.1*
+        - name: dag_id_pattern
+          in: query
+          schema:
+            type: string
+          required: false
+          description: |
+            If set, only return DAGs with dag_ids matching this pattern.
+
+            *New in version 2.3.0*

Review comment:
       ```suggestion
               If set, only update DAGs with dag_ids matching this pattern.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r763707903



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -88,25 +88,67 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 @provide_session
 def patch_dag(session, dag_id, update_mask=None):
     """Update the specific DAG"""
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if update_mask != ['is_paused']:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        patch_body_[update_mask] = patch_body[update_mask]

Review comment:
       ```suggestion
           if update_mask != ['is_paused']:
               raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
           patch_body_[update_mask[0]] = patch_body[update_mask[0]]
   ```
   
   Pre-emptively fix the Mypy issue that eventually needs to be fixed by the ongoing Mypy-enabling effort 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r763406321



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -445,27 +428,22 @@ paths:
       summary: Update DAGs
       x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
       operationId: patch_dags
-      tags: [DAG]
       parameters:
         - $ref: '#/components/parameters/PageLimit'
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/FilterTags'
         - $ref: '#/components/parameters/UpdateMask'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only update active DAGs.
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only update DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      tags: [DAG]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DAGCollection'

Review comment:
       Only `is_paused` is allowed to be updated but because of how clients work, we have to give the DAG object so that we generate a correct request body in clients. For example, we don't want to just have `is_paused` object, when `is_paused` is part of DAG and we already have a DAG object. The API doc knows to generate it in documentation correctly by filtering out readonly fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r757667666



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -445,27 +428,22 @@ paths:
       summary: Update DAGs
       x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
       operationId: patch_dags
-      tags: [DAG]
       parameters:
         - $ref: '#/components/parameters/PageLimit'
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/FilterTags'
         - $ref: '#/components/parameters/UpdateMask'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only update active DAGs.
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only update DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      tags: [DAG]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DAGCollection'

Review comment:
       @ephraimbuddy I made this change, but am wondering if you can clarify why the request body contains a DAG or DAGCollection schema? It doesn't seem like there is a DAG schema being passed in the request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#issuecomment-975961950


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r764266279



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,38 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only return active DAGs.
-
-            *New in version 2.1.1*
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only return DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.3.0*
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'

Review comment:
       This only applies to the bulk patch endpoint, others can stay as they are




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r818342217



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -100,25 +100,67 @@ def get_dags(
 @provide_session
 def patch_dag(*, dag_id: str, update_mask: UpdateMask = None, session: Session = NEW_SESSION) -> APIResponse:
     """Update the specific DAG"""
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if update_mask != ['is_paused']:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        patch_body_[update_mask[0]] = patch_body[update_mask[0]]
+        patch_body = patch_body_
     dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none()
     if not dag:
         raise NotFound(f"Dag with id: '{dag_id}' not found")
+    dag.is_paused = patch_body['is_paused']
+    session.flush()
+    return dag_schema.dump(dag)
+
+
+@security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
     try:
         patch_body = dag_schema.load(request.json, session=session)
     except ValidationError as err:
-        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+        raise BadRequest(detail=str(err.messages))
     if update_mask:
         patch_body_ = {}
-        if len(update_mask) > 1:
+        if update_mask != ['is_paused']:
             raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
         update_mask = update_mask[0]
-        if update_mask != 'is_paused':
-            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
         patch_body_[update_mask] = patch_body[update_mask]
         patch_body = patch_body_
-    setattr(dag, 'is_paused', patch_body['is_paused'])
-    session.commit()
-    return dag_schema.dump(dag)
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern == '~':
+        dag_id_pattern = '%'

Review comment:
       The current behaviour is to update all DAGs. So yeah this new logic seems wrong in that case, good catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r756359399



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -109,6 +109,53 @@ def patch_dag(session, dag_id, update_mask=None):
     return dag_schema.dump(dag)
 
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+
+    dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
+    if tags:
+        cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
+        dags_query = dags_query.filter(or_(*cond))
+
+    total_entries = len(dags_query.all())

Review comment:
       Uh. I noticed `get_dags` also does this. It’s slow and we should use `func.count()` instead. Could you change this?

##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -109,6 +109,53 @@ def patch_dag(session, dag_id, update_mask=None):
     return dag_schema.dump(dag)
 
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+
+    dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
+    if tags:
+        cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
+        dags_query = dags_query.filter(or_(*cond))
+
+    total_entries = len(dags_query.all())
+
+    dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
+
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if len(update_mask) > 1:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        if update_mask != 'is_paused':
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        patch_body_[update_mask] = patch_body[update_mask]
+        patch_body = patch_body_
+    dags_to_update = {dag.dag_id for dag in dags}
+    session.query(DagModel).filter(DagModel.dag_id.in_(dags_to_update)).update(
+        {DagModel.is_paused: patch_body['is_paused']}, synchronize_session='fetch'
+    )
+
+    session.commit()

Review comment:
       This can be a `session.flush()` and delay the commit (which is done after the function finishes, by `provide_session`).

##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -109,6 +109,53 @@ def patch_dag(session, dag_id, update_mask=None):
     return dag_schema.dump(dag)
 
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+
+    dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
+    if tags:
+        cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
+        dags_query = dags_query.filter(or_(*cond))
+
+    total_entries = len(dags_query.all())
+
+    dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
+
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if len(update_mask) > 1:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        if update_mask != 'is_paused':
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")

Review comment:
       Can this repeated check be simplied to this?
   
   ```python
   if update_mask != ["is_paused"]:
       raise ...
   ```

##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -109,6 +109,53 @@ def patch_dag(session, dag_id, update_mask=None):
     return dag_schema.dump(dag)
 
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+
+    dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
+    if tags:
+        cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
+        dags_query = dags_query.filter(or_(*cond))
+
+    total_entries = len(dags_query.all())
+
+    dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
+
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if len(update_mask) > 1:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        if update_mask != 'is_paused':
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        patch_body_[update_mask] = patch_body[update_mask]
+        patch_body = patch_body_

Review comment:
       This block should be moved to the beginning of the endpoint, so an invalid payload would not need to trigger database queries unnecessarily.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r765286046



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,38 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only return active DAGs.
-
-            *New in version 2.1.1*
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only return DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.3.0*
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'

Review comment:
       Does that mean that the bulk `get` endpoint should continue to treat `dag_pattern_id` as optional and default to returning all dags if no pattern is provided?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r818233478



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -100,25 +100,67 @@ def get_dags(
 @provide_session
 def patch_dag(*, dag_id: str, update_mask: UpdateMask = None, session: Session = NEW_SESSION) -> APIResponse:
     """Update the specific DAG"""
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if update_mask != ['is_paused']:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        patch_body_[update_mask[0]] = patch_body[update_mask[0]]
+        patch_body = patch_body_
     dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none()
     if not dag:
         raise NotFound(f"Dag with id: '{dag_id}' not found")
+    dag.is_paused = patch_body['is_paused']
+    session.flush()
+    return dag_schema.dump(dag)
+
+
+@security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
     try:
         patch_body = dag_schema.load(request.json, session=session)
     except ValidationError as err:
-        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+        raise BadRequest(detail=str(err.messages))
     if update_mask:
         patch_body_ = {}
-        if len(update_mask) > 1:
+        if update_mask != ['is_paused']:
             raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
         update_mask = update_mask[0]
-        if update_mask != 'is_paused':
-            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
         patch_body_[update_mask] = patch_body[update_mask]
         patch_body = patch_body_
-    setattr(dag, 'is_paused', patch_body['is_paused'])
-    session.commit()
-    return dag_schema.dump(dag)
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern == '~':
+        dag_id_pattern = '%'

Review comment:
       What happens when the dag_id_pattern is None? Will it update all DAGs or it won't? 
   I'll still do a real world testing though, just curious




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r754951374



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -86,10 +86,13 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 
 @security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
 @provide_session
-def patch_dag(session, dag_id, update_mask=None):
+def patch_dag(session, dag_id, update_mask=None, multi=False):

Review comment:
       This `multi` flag does not looks right to me. Bulk operations need their own endpoint instead IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755367295



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -108,6 +108,48 @@ def patch_dag(session, dag_id, update_mask=None):
     session.commit()
     return dag_schema.dump(dag)
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch all DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)

Review comment:
       ```suggestion
       editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
   ```
   
   Since `get_accessible_dag_ids` returns any DAGs which are editable _or_ readable. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r756686738



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -88,25 +88,68 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 @provide_session
 def patch_dag(session, dag_id, update_mask=None):
     """Update the specific DAG"""
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if update_mask != ['is_paused']:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        patch_body_[update_mask] = patch_body[update_mask]
+        patch_body = patch_body_
     dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none()
     if not dag:
         raise NotFound(f"Dag with id: '{dag_id}' not found")
+    setattr(dag, 'is_paused', patch_body['is_paused'])
+    session.flush()
+    return dag_schema.dump(dag)
+
+
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])

Review comment:
       ```suggestion
   @security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
   ```

##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -77,7 +77,7 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
         cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
         dags_query = dags_query.filter(or_(*cond))
 
-    total_entries = len(dags_query.all())
+    total_entries = dags_query.count()

Review comment:
       This may give us issues later(not sure) see https://docs.sqlalchemy.org/en/14/faq/sessions.html#faq-query-deduplicating

##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -88,25 +88,68 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 @provide_session
 def patch_dag(session, dag_id, update_mask=None):
     """Update the specific DAG"""
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))

Review comment:
       ```suggestion
           raise BadRequest(detail=str(err.messages))
   ```
   For consistency

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -441,6 +441,41 @@ paths:
         '401':
           $ref: '#/components/responses/Unauthenticated'
 
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      tags: [DAG]
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - name: only_active
+          in: query
+          schema:
+            type: boolean
+            default: true
+          required: false
+          description: |
+            Only update active DAGs.
+        - name: dag_id_pattern
+          in: query
+          schema:
+            type: string
+          required: false
+          description: |
+            If set, only update DAGs with dag_ids matching this pattern.

Review comment:
       We should have these schemas in the components/parameters section so it can be reused if need arises

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -441,6 +441,41 @@ paths:
         '401':
           $ref: '#/components/responses/Unauthenticated'
 
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      tags: [DAG]
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - name: only_active
+          in: query
+          schema:
+            type: boolean
+            default: true
+          required: false
+          description: |
+            Only update active DAGs.
+        - name: dag_id_pattern
+          in: query
+          schema:
+            type: string
+          required: false
+          description: |
+            If set, only update DAGs with dag_ids matching this pattern.

Review comment:
       RequestBody is needed too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755355341



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -86,10 +86,13 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 
 @security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
 @provide_session
-def patch_dag(session, dag_id, update_mask=None):
+def patch_dag(session, dag_id, update_mask=None, multi=False):

Review comment:
       Thanks for the suggestion, I agree that `patch_dags` should follow the same pattern as `get_dags`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755581047



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -108,6 +108,48 @@ def patch_dag(session, dag_id, update_mask=None):
     session.commit()
     return dag_schema.dump(dag)
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+
+    dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
+    if tags:
+        cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
+        dags_query = dags_query.filter(or_(*cond))
+
+    total_entries = len(dags_query.all())
+
+    dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
+
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if len(update_mask) > 1:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        if update_mask != 'is_paused':
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        patch_body_[update_mask] = patch_body[update_mask]
+        patch_body = patch_body_
+    for dag in dags:

Review comment:
       Would it be possible to do this in a single operation rather than a `for` loop? I know that mySQL doesn't support `update .. returning` queries, but maybe we could update all of the DAGs in a `query.update()` and then fetch the list separately?
   
   I wanna say we can just do something like:
   ```python
   dags_query = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit)
   
   dags_query.update({DagModel.is_paused: is_paused}, synchronize_session='fetch')
   
   dags = dags_query.all()
   ```
   
   But I don't think that the set of DAGs updated will necessarily be the same as the set of DAGs returned? I guess it depends on the isolation level of the database 🤔 
   
   Anyways, let me know if you have any suggestions, otherwise I think it's fine to leave this as an O(n) operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#issuecomment-1020126970


   @ephraimbuddy could you please review again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r765899157



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -441,6 +441,41 @@ paths:
         '401':
           $ref: '#/components/responses/Unauthenticated'
 
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      tags: [DAG]
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - name: only_active
+          in: query
+          schema:
+            type: boolean
+            default: true
+          required: false
+          description: |
+            Only update active DAGs.
+        - name: dag_id_pattern
+          in: query
+          schema:
+            type: string
+          required: false
+          description: |
+            If set, only update DAGs with dag_ids matching this pattern.

Review comment:
       @ashb I had modified both the `get` and `patch` bulk endpoints. `~` can be supplied to return all dags if needed. 
   I had done it this way since `dag_id_parameter` is a reusable parameter as per this thread. I am thinking to define `dag_id_pattern` separately in both bulk endpoints and define it as required in `patch` and optional in `get`. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r765895225



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,38 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only return active DAGs.
-
-            *New in version 2.1.1*
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only return DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.3.0*
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'

Review comment:
       Yeah, the behaviour of bulk `get` shouldn't change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r822326341



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,50 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
+        - $ref: '#/components/parameters/OnlyActive'
+        - name: dag_id_pattern
           in: query
           schema:
-            type: boolean
-            default: true
+            type: string
           required: false
           description: |
-            Only return active DAGs.
+            If set, only return DAGs with dag_ids matching this pattern.

Review comment:
       We should remove dag_id_pattern on /dags endpoint and make it a separate PR then add tests for it. Currently, there's no test for it and it will be good to make this change in a separate PR

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,50 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
+        - $ref: '#/components/parameters/OnlyActive'
+        - name: dag_id_pattern
           in: query
           schema:
-            type: boolean
-            default: true
+            type: string
           required: false
           description: |
-            Only return active DAGs.
+            If set, only return DAGs with dag_ids matching this pattern.
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.1.1*
+    patch:
+      summary: Update DAGs

Review comment:
       Add a description: field and also update that this is new in 2.3.0

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,50 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
+        - $ref: '#/components/parameters/OnlyActive'
+        - name: dag_id_pattern
           in: query
           schema:
-            type: boolean
-            default: true
+            type: string
           required: false
           description: |
-            Only return active DAGs.
+            If set, only return DAGs with dag_ids matching this pattern.
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.1.1*

Review comment:
       This shouldn't be removed or am I missing something? Looks like it should be at 3814 or in a description field under `/dags`

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,50 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
+        - $ref: '#/components/parameters/OnlyActive'
+        - name: dag_id_pattern
           in: query
           schema:
-            type: boolean
-            default: true
+            type: string
           required: false
           description: |
-            Only return active DAGs.
+            If set, only return DAGs with dag_ids matching this pattern.
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.1.1*
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - $ref: '#/components/parameters/OnlyActive'
         - name: dag_id_pattern
           in: query
           schema:
             type: string
-          required: false
+          required: true
           description: |
             If set, only return DAGs with dag_ids matching this pattern.

Review comment:
       ```suggestion
               If set, only update DAGs with dag_ids matching this pattern.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r763891512



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,38 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only return active DAGs.
-
-            *New in version 2.1.1*
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only return DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.3.0*
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'

Review comment:
       This endpoint by default operates on _all_ DAGS by default right? That feels like a bit of a foot gun, and I _think_ I'd like to make dag id pattern required here, i.e. `?dag_id_pattern=%` is required to operate on all dags.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r764177013



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,38 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only return active DAGs.
-
-            *New in version 2.1.1*
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only return DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.3.0*
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/FilterTags'
+        - $ref: '#/components/parameters/UpdateMask'
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'

Review comment:
       Yes, I agree, I can add that fix in this PR. WDYT about setting `dag_id_pattern` as `~` to return all dags if `dag_id_pattern` is required?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755335660



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -86,10 +86,13 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 
 @security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
 @provide_session
-def patch_dag(session, dag_id, update_mask=None):
+def patch_dag(session, dag_id, update_mask=None, multi=False):

Review comment:
       I think that this sounds like a good solution, but I would suggest that we still include a `dag_id_pattern` argument for filtering DAG IDs (which would then be consistent with the `GET /dags` endpoint)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#issuecomment-1064498987


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #19758:
URL: https://github.com/apache/airflow/pull/19758


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755367547



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -108,6 +108,48 @@ def patch_dag(session, dag_id, update_mask=None):
     session.commit()
     return dag_schema.dump(dag)
 
+@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pattern=None, update_mask=None):
+    """Patch all DAGs."""

Review comment:
       ```suggestion
       """Patch multiple DAGs."""
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755363062



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -441,6 +441,46 @@ paths:
         '401':
           $ref: '#/components/responses/Unauthenticated'
 
+    patch:
+      summary: Update DAGs
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
+      operationId: patch_dags
+      tags: [DAG]
+      parameters:
+        - $ref: '#/components/parameters/PageLimit'
+        - $ref: '#/components/parameters/PageOffset'
+        - $ref: '#/components/parameters/OrderBy'

Review comment:
       ```suggestion
   ```
   
   Since the order_by arg isn't included in the function. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r756612051



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -88,25 +88,68 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 @provide_session
 def patch_dag(session, dag_id, update_mask=None):
     """Update the specific DAG"""
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if update_mask != ['is_paused']:
+            raise BadRequest(detail="Only `is_paused` field can be updated through the REST API")
+        update_mask = update_mask[0]
+        patch_body_[update_mask] = patch_body[update_mask]
+        patch_body = patch_body_
     dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one_or_none()
     if not dag:
         raise NotFound(f"Dag with id: '{dag_id}' not found")
+    setattr(dag, 'is_paused', patch_body['is_paused'])

Review comment:
       Any reason why this is not
   
   ```suggestion
       dag.is_paused = patch_body['is_paused']
   ```
   
   instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r757137427



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -445,27 +428,22 @@ paths:
       summary: Update DAGs
       x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint
       operationId: patch_dags
-      tags: [DAG]
       parameters:
         - $ref: '#/components/parameters/PageLimit'
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/FilterTags'
         - $ref: '#/components/parameters/UpdateMask'
-        - name: only_active
-          in: query
-          schema:
-            type: boolean
-            default: true
-          required: false
-          description: |
-            Only update active DAGs.
-        - name: dag_id_pattern
-          in: query
-          schema:
-            type: string
-          required: false
-          description: |
-            If set, only update DAGs with dag_ids matching this pattern.
+        - $ref: '#/components/parameters/OnlyActive'
+        - $ref: '#/components/parameters/DagIdPattern'
+      tags: [DAG]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DAGCollection'

Review comment:
       ```suggestion
                 $ref: '#/components/schemas/DAG'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r754953276



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -86,10 +86,13 @@ def get_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_patte
 
 @security.requires_access([(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG)])
 @provide_session
-def patch_dag(session, dag_id, update_mask=None):
+def patch_dag(session, dag_id, update_mask=None, multi=False):

Review comment:
       Maybe a `POST` to `/dags/~/patch` like `get_dag_runs_batch`, or just send a `PATCH` to `/dags/` would be a good idea. Not sure. But this is not the right solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r823439581



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -404,25 +408,56 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
+        - $ref: '#/components/parameters/OnlyActive'
+        - name: dag_id_pattern
           in: query
           schema:
-            type: boolean
-            default: true
+            type: string
           required: false
           description: |
-            Only return active DAGs.
+            If set, only return DAGs with dag_ids matching this pattern.
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGCollection'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
-            *New in version 2.1.1*
+    patch:
+      summary: Update DAGs
+      description: >
+       Update DAGs of a given dag_id_pattern using UpdateMask.
+
+       This endpoint allows specifying `~` as the dag_id_pattern to retrieve DAG runs for all DAGs.

Review comment:
       ```suggestion
          This endpoint allows specifying `~` as the dag_id_pattern to update all DAGs.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] megan-parker commented on a change in pull request #19758: Api add support bulk pause resume

Posted by GitBox <gi...@apache.org>.
megan-parker commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r822859204



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -412,25 +412,50 @@ paths:
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
         - $ref: '#/components/parameters/FilterTags'
-        - name: only_active
+        - $ref: '#/components/parameters/OnlyActive'
+        - name: dag_id_pattern
           in: query
           schema:
-            type: boolean
-            default: true
+            type: string
           required: false
           description: |
-            Only return active DAGs.
+            If set, only return DAGs with dag_ids matching this pattern.

Review comment:
       I hadn't added the `dag_id_pattern`, this was added by @SamWheating in [this](https://github.com/apache/airflow/pull/18924/files) PR.
   I can see that the field is existing in the [main](https://github.com/apache/airflow/blob/a4d514bd7f7777e5bf03e9fa8e567f1c8774a74d/airflow/api_connexion/openapi/v1.yaml#L417) branch. It looks like the git diff is [misleading](https://github.com/apache/airflow/blame/bc8d9ba119ea7112a25d67aa70a6ea6e5069052b/airflow/api_connexion/openapi/v1.yaml#L412).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org