You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/28 17:46:08 UTC

[GitHub] [airflow] takunnithan opened a new pull request #9556: WIP : API Endpoint - Batch #9112

takunnithan opened a new pull request #9556:
URL: https://github.com/apache/airflow/pull/9556


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448834084



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,243 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()
+        payload = {
+            "dag_ids": ["TEST_DAG_ID"],
+        }
+        response = self.client.post("api/v1/dags/~/dagRuns/list", json=payload)
+        assert response.status_code == 200
+        self.assertEqual(

Review comment:
       Should we use the same style of asserts in test? I would opt for `assert x == y`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-656211435


   @takunnithan could please take a look at failed CI builds? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449696389



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
     total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+    """ Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint"""
+
+    class Meta:
+        """ Meta """
+        datetimeformat = 'iso'
+        strict = True
+
+    page_offset = fields.Int(required=False, missing=0, min=0)
+    page_limit = fields.Int(required=False, missing=100, min=1)
+    dag_ids = fields.List(fields.Str(), required=False, missing=None)
+    execution_date_gte = fields.DateTime(required=False, missing=None)

Review comment:
       Thanks @ephraimbuddy . I have removed `required=False` from the args. But without `missing=None`, the fields were missing in deserialized data.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-652159554


   @ephraimbuddy - **Thank you** for taking the time to review the pr and give valuable suggestions. I followed the marshmallow schema approach you suggested. Also added more test cases.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-657419912


   Awesome work, congrats on your first merged pull request!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #9556: WIP : API Endpoint - Batch #9112

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-650799249


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://apache-airflow-slack.herokuapp.com/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447160605



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -67,40 +67,72 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@format_parameters({
+    'start_date_gte': format_datetime,
+    'start_date_lte': format_datetime,
+    'execution_date_gte': format_datetime,
+    'execution_date_lte': format_datetime,
+    'end_date_gte': format_datetime,
+    'end_date_lte': format_datetime,
+})
+@provide_session
+def get_dag_runs_batch(session, dag_ids, start_date_gte=None, start_date_lte=None,
+                       execution_date_gte=None, execution_date_lte=None,
+                       end_date_gte=None, end_date_lte=None, offset=None, limit=None):
     """
     Get list of DAG Runs
     """

Review comment:
       ```
   body = request.get_json()
   try:
        data = list_dag_runs_form_schema.load(body)
   except ValidationError as err:
        raise BadRequest(detail=err.messages)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449696774



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,243 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()
+        payload = {
+            "dag_ids": ["TEST_DAG_ID"],
+        }
+        response = self.client.post("api/v1/dags/~/dagRuns/list", json=payload)
+        assert response.status_code == 200
+        self.assertEqual(

Review comment:
       Yes. Replaced `assertEqual` with `assert`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447430621



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,55 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()
+        payload = {
+            "page_offset": 0,
+            "page_limit": 10,
+            "dag_ids": ["TEST_DAG_I"],
+            "execution_date_gte": "2020-06-24T19:54:56Z",
+            "execution_date_lte": self.default_time,
+            "start_date_gte": "2020-06-24T19:54:56Z",
+            "start_date_lte": self.default_time,
+            "end_date_gte": "2020-06-24T19:54:56Z",
+            "end_date_lte": self.default_time
+        }
+        response = self.client.post("api/v1/dags/~/dagRuns/list", json=payload)
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time_2,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                ],
+                "total_entries": 2,
+            }
+        )
+
+

Review comment:
       Parameters for the view function are only passed if the parameters are from a URL  In this case, the parameters are passed as the body, so you need to handle them in the view.
   https://connexion.readthedocs.io/en/latest/request.html#automatic-parameter-handling




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449756594



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -68,40 +70,69 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@provide_session
+def get_dag_runs_batch(session):
     """
     Get list of DAG Runs
     """
-    raise NotImplementedError("Not implemented yet.")
+    body = request.get_json()
+    try:
+        data = dagruns_batch_form_schema.load(body).data
+    except ValidationError as err:
+        raise BadRequest(detail=err.messages)
+
+    query = session.query(DagRun)
+
+    if data["dag_ids"]:
+        query = query.filter(DagRun.dag_id.in_(data["dag_ids"]))
+
+    dag_runs, total_entries = _fetch_dag_runs(query, session, data["end_date_gte"], data["end_date_lte"],
+                                              data["execution_date_gte"], data["execution_date_lte"],
+                                              data["start_date_gte"], data["start_date_lte"],
+                                              data["page_limit"], data["page_offset"])

Review comment:
       In `api_connexion/parameters.py` we have some hardcoded params like those. Should we extend the list and use them here instead of using strings?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj merged pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #9556:
URL: https://github.com/apache/airflow/pull/9556


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r452434706



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -87,41 +87,68 @@ def get_dag_runs(
     if dag_id != "~":
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
 
-    return dagrun_collection_schema.dump(
-        DAGRunCollection(dag_runs=dag_run, total_entries=total_entries)
-    )
-
-
-def get_dag_runs_batch():
+@provide_session
+def get_dag_runs_batch(session):
     """
     Get list of DAG Runs
     """
-    raise NotImplementedError("Not implemented yet.")
+    body = request.get_json()
+    try:
+        data = dagruns_batch_form_schema.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=err.messages)

Review comment:
       ```suggestion
           raise BadRequest(detail=str(err.messages))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-657182733


   @turbaszek Re-ran CI builds with an empty commit. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-656497320


   @turbaszek  I have fixed the merge conflicts and test cases. But one of the CI check is failing. 
   ```
   Error: some of the CI environment failed to initialize!
   ERROR! Maximum number of retries while checking rabbitmq integration. Exiting
   ```
   Is there a way to re-run the failed one again ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448800768



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
     total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+    """ Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint"""
+
+    class Meta:
+        """ Meta """
+        datetimeformat = 'iso'
+        strict = True
+
+    page_offset = fields.Int(required=False, missing=0, min=0)
+    page_limit = fields.Int(required=False, missing=100, min=1)

Review comment:
       ```suggestion
       page_limit = fields.Int(required=False, missing=100, min=1, max=100)
   ```
   
   It would be better to cap the limit to 100. @ephraimbuddy @takunnithan what do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447160605



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -67,40 +67,72 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@format_parameters({
+    'start_date_gte': format_datetime,
+    'start_date_lte': format_datetime,
+    'execution_date_gte': format_datetime,
+    'execution_date_lte': format_datetime,
+    'end_date_gte': format_datetime,
+    'end_date_lte': format_datetime,
+})
+@provide_session
+def get_dag_runs_batch(session, dag_ids, start_date_gte=None, start_date_lte=None,
+                       execution_date_gte=None, execution_date_lte=None,
+                       end_date_gte=None, end_date_lte=None, offset=None, limit=None):
     """
     Get list of DAG Runs
     """

Review comment:
       ```
   body = request.get_json()
       try:
           data = list_dag_runs_form_schema.load(body)
       except ValidationError as err:
           raise BadRequest(detail=err.messages)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449696638



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,243 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()

Review comment:
       Thanks @turbaszek . This is addressed in the latest commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447159916



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -67,40 +67,72 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@format_parameters({
+    'start_date_gte': format_datetime,
+    'start_date_lte': format_datetime,
+    'execution_date_gte': format_datetime,
+    'execution_date_lte': format_datetime,
+    'end_date_gte': format_datetime,
+    'end_date_lte': format_datetime,
+})
+@provide_session
+def get_dag_runs_batch(session, dag_ids, start_date_gte=None, start_date_lte=None,
+                       execution_date_gte=None, execution_date_lte=None,
+                       end_date_gte=None, end_date_lte=None, offset=None, limit=None):

Review comment:
       ```suggestion
   def get_dag_runs_batch(session):
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449756755



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -68,40 +70,69 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@provide_session
+def get_dag_runs_batch(session):
     """
     Get list of DAG Runs
     """
-    raise NotImplementedError("Not implemented yet.")
+    body = request.get_json()
+    try:
+        data = dagruns_batch_form_schema.load(body).data
+    except ValidationError as err:
+        raise BadRequest(detail=err.messages)
+
+    query = session.query(DagRun)
+
+    if data["dag_ids"]:
+        query = query.filter(DagRun.dag_id.in_(data["dag_ids"]))
+
+    dag_runs, total_entries = _fetch_dag_runs(query, session, data["end_date_gte"], data["end_date_lte"],
+                                              data["execution_date_gte"], data["execution_date_lte"],
+                                              data["start_date_gte"], data["start_date_lte"],
+                                              data["page_limit"], data["page_offset"])

Review comment:
       It no longer exists but can be added back.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449701317



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -80,15 +80,15 @@ class Meta:
         datetimeformat = 'iso'
         strict = True
 
-    page_offset = fields.Int(required=False, missing=0, min=0)
-    page_limit = fields.Int(required=False, missing=100, min=1)
-    dag_ids = fields.List(fields.Str(), required=False, missing=None)
-    execution_date_gte = fields.DateTime(required=False, missing=None)
-    execution_date_lte = fields.DateTime(required=False, missing=None)
-    start_date_gte = fields.DateTime(required=False, missing=None)
-    start_date_lte = fields.DateTime(required=False, missing=None)
-    end_date_gte = fields.DateTime(required=False, missing=None)
-    end_date_lte = fields.DateTime(required=False, missing=None)
+    page_offset = fields.Int(missing=0, min=0)
+    page_limit = fields.Int(missing=100, min=1)
+    dag_ids = fields.List(fields.Str(), missing=None)
+    execution_date_gte = fields.DateTime(missing=None)
+    execution_date_lte = fields.DateTime(missing=None)
+    start_date_gte = fields.DateTime(missing=None)
+    start_date_lte = fields.DateTime(missing=None)
+    end_date_gte = fields.DateTime(missing=None)
+    end_date_lte = fields.DateTime(missing=None)

Review comment:
       The fields were missing in the deserialized data, when `missing=None` was removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449759016



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -68,40 +70,69 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@provide_session
+def get_dag_runs_batch(session):
     """
     Get list of DAG Runs
     """
-    raise NotImplementedError("Not implemented yet.")
+    body = request.get_json()
+    try:
+        data = dagruns_batch_form_schema.load(body).data
+    except ValidationError as err:
+        raise BadRequest(detail=err.messages)
+
+    query = session.query(DagRun)
+
+    if data["dag_ids"]:
+        query = query.filter(DagRun.dag_id.in_(data["dag_ids"]))
+
+    dag_runs, total_entries = _fetch_dag_runs(query, session, data["end_date_gte"], data["end_date_lte"],
+                                              data["execution_date_gte"], data["execution_date_lte"],
+                                              data["start_date_gte"], data["start_date_lte"],
+                                              data["page_limit"], data["page_offset"])

Review comment:
       Oh indeed, I'm ok with not adding this back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-656214257


   @turbaszek Yes, I am fixing it. There was mistake in resolving the merge conflict.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448810142



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
     total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+    """ Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint"""
+
+    class Meta:
+        """ Meta """
+        datetimeformat = 'iso'
+        strict = True
+
+    page_offset = fields.Int(required=False, missing=0, min=0)
+    page_limit = fields.Int(required=False, missing=100, min=1)

Review comment:
       I think it's better left out. Some servers can handle more than 1000 page request limit. I'm working on configurable page request limit and we can apply it here. Then it'll not be a hard limit but a limit that the users can configure.
   You can see the commit here https://github.com/apache/airflow/commit/a37ac22c179e5dccc95b05c862bffc542dad125e




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448256015



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
     total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+    """ Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint"""
+
+    class Meta:
+        """ Meta """
+        datetimeformat = 'iso'
+        strict = True
+
+    page_offset = fields.Int(required=False, missing=0, min=0)
+    page_limit = fields.Int(required=False, missing=100, min=1)
+    dag_ids = fields.List(fields.Str(), required=False, missing=None)
+    execution_date_gte = fields.DateTime(required=False, missing=None)

Review comment:
       ```suggestion
       execution_date_gte = fields.DateTime()
   ```
   I suggest you remove the missing and required field.  To only use them when the value should be something other than False and None respectively. This is already the default from marshmallow. 
   https://marshmallow.readthedocs.io/en/2.x-line/api_reference.html#module-marshmallow.fields




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447152188



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,55 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()
+        payload = {
+            "page_offset": 0,
+            "page_limit": 10,
+            "dag_ids": ["TEST_DAG_I"],
+            "execution_date_gte": "2020-06-24T19:54:56Z",
+            "execution_date_lte": self.default_time,
+            "start_date_gte": "2020-06-24T19:54:56Z",
+            "start_date_lte": self.default_time,
+            "end_date_gte": "2020-06-24T19:54:56Z",
+            "end_date_lte": self.default_time
+        }
+        response = self.client.post("api/v1/dags/~/dagRuns/list", json=payload)
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time_2,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                ],
+                "total_entries": 2,
+            }
+        )
+
+

Review comment:
       Thanks! @ephraimbuddy .  I will add more test soon.
   
   @ephraimbuddy @mik-laj Could you help me figure out why my test case is failing ? 
   
   I am getting this error:
   **TypeError: get_dag_runs_batch() missing 1 required positional argument: 'dag_ids'**
   
   More details is here: https://github.com/apache/airflow/pull/9556/checks?check_run_id=816283026
   
   I get the same error in local dev env. 
   
   Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447161957



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -67,40 +67,72 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@format_parameters({
+    'start_date_gte': format_datetime,
+    'start_date_lte': format_datetime,
+    'execution_date_gte': format_datetime,
+    'execution_date_lte': format_datetime,
+    'end_date_gte': format_datetime,
+    'end_date_lte': format_datetime,
+})
+@provide_session
+def get_dag_runs_batch(session, dag_ids, start_date_gte=None, start_date_lte=None,
+                       execution_date_gte=None, execution_date_lte=None,
+                       end_date_gte=None, end_date_lte=None, offset=None, limit=None):
     """
     Get list of DAG Runs
     """

Review comment:
       Your schema should be like this:
   
   ```
   class ListDagRunsFormSchema(Schema):
       """ Schema for ListDagRunsForm """
   
       class Meta:
           """ Meta """
           dateformat = 'iso'
   
       page_offset = fields.Int(min=0)
       page_limit = fields.Int(min=1, max=100)
       dag_ids = fields.List(fields.Str())
       execution_date_gte = fields.DateTime()
       execution_date_lte = fields.DateTime()
       start_date_gte = fields.DateTime()
       start_date_lte = fields.DateTime()
       end_date_gte = fields.DateTime()
       end_date_lte = fields.DateTime()
   ```
   then instantiate it as `list_dag_runs_form_schema = ListDagRunsFormSchema(strict=True)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447162757



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -67,40 +67,72 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@format_parameters({
+    'start_date_gte': format_datetime,
+    'start_date_lte': format_datetime,
+    'execution_date_gte': format_datetime,
+    'execution_date_lte': format_datetime,
+    'end_date_gte': format_datetime,
+    'end_date_lte': format_datetime,
+})
+@provide_session
+def get_dag_runs_batch(session, dag_ids, start_date_gte=None, start_date_lte=None,
+                       execution_date_gte=None, execution_date_lte=None,
+                       end_date_gte=None, end_date_lte=None, offset=None, limit=None):
     """
     Get list of DAG Runs
     """

Review comment:
       You have to add the schema at dag_runs_schema.py




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#issuecomment-654893895


   Hi @mik-laj,  Is this good to merge ? Could you please review the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449699768



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -80,15 +80,15 @@ class Meta:
         datetimeformat = 'iso'
         strict = True
 
-    page_offset = fields.Int(required=False, missing=0, min=0)
-    page_limit = fields.Int(required=False, missing=100, min=1)
-    dag_ids = fields.List(fields.Str(), required=False, missing=None)
-    execution_date_gte = fields.DateTime(required=False, missing=None)
-    execution_date_lte = fields.DateTime(required=False, missing=None)
-    start_date_gte = fields.DateTime(required=False, missing=None)
-    start_date_lte = fields.DateTime(required=False, missing=None)
-    end_date_gte = fields.DateTime(required=False, missing=None)
-    end_date_lte = fields.DateTime(required=False, missing=None)
+    page_offset = fields.Int(missing=0, min=0)
+    page_limit = fields.Int(missing=100, min=1)
+    dag_ids = fields.List(fields.Str(), missing=None)
+    execution_date_gte = fields.DateTime(missing=None)
+    execution_date_lte = fields.DateTime(missing=None)
+    start_date_gte = fields.DateTime(missing=None)
+    start_date_lte = fields.DateTime(missing=None)
+    end_date_gte = fields.DateTime(missing=None)
+    end_date_lte = fields.DateTime(missing=None)

Review comment:
       Do you have any problem when you remove `missing=None`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448833504



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,243 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()

Review comment:
       ```suggestion
   
       def test_should_respond_200(self):
           dag_runs = self._create_test_dag_run()
           with create_session() as session:
               session.add_all(dag_runs)
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448835585



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,243 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()

Review comment:
       Or maybe it would be better to extend `_create_test_dag_run` with commit option? For example `_create_test_dag_run(commit=True)` so we can avoid code duplication




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] takunnithan commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
takunnithan commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448088709



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,55 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()
+        payload = {
+            "page_offset": 0,
+            "page_limit": 10,
+            "dag_ids": ["TEST_DAG_I"],
+            "execution_date_gte": "2020-06-24T19:54:56Z",
+            "execution_date_lte": self.default_time,
+            "start_date_gte": "2020-06-24T19:54:56Z",
+            "start_date_lte": self.default_time,
+            "end_date_gte": "2020-06-24T19:54:56Z",
+            "end_date_lte": self.default_time
+        }
+        response = self.client.post("api/v1/dags/~/dagRuns/list", json=payload)
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time_2,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                ],
+                "total_entries": 2,
+            }
+        )
+
+

Review comment:
       Thanks @mik-laj




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448256015



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
     total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+    """ Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint"""
+
+    class Meta:
+        """ Meta """
+        datetimeformat = 'iso'
+        strict = True
+
+    page_offset = fields.Int(required=False, missing=0, min=0)
+    page_limit = fields.Int(required=False, missing=100, min=1)
+    dag_ids = fields.List(fields.Str(), required=False, missing=None)
+    execution_date_gte = fields.DateTime(required=False, missing=None)

Review comment:
       ```suggestion
       execution_date_gte = fields.DateTime()
   ```
   I suggest you remove the missing and required args.  To only use them when the value should be something other than False and None respectively. This is already the default from marshmallow. 
   https://marshmallow.readthedocs.io/en/2.x-line/api_reference.html#module-marshmallow.fields




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r447160605



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -67,40 +67,72 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
     if dag_id != '~':
         query = query.filter(DagRun.dag_id == dag_id)
 
+    dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte,
+                                             execution_date_lte, start_date_gte, start_date_lte,
+                                             limit, offset)
+
+    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+                                                          total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+                    execution_date_gte, execution_date_lte,
+                    start_date_gte, start_date_lte, limit, offset):
+
+    query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                         execution_date_lte, start_date_gte, start_date_lte)
+    # apply offset and limit
+    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+    total_entries = session.query(func.count(DagRun.id)).scalar()
+    return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte,
+                                 execution_date_lte, start_date_gte, start_date_lte):
     # filter start date
     if start_date_gte:
         query = query.filter(DagRun.start_date >= start_date_gte)
-
     if start_date_lte:
         query = query.filter(DagRun.start_date <= start_date_lte)
-
     # filter execution date
     if execution_date_gte:
         query = query.filter(DagRun.execution_date >= execution_date_gte)
-
     if execution_date_lte:
         query = query.filter(DagRun.execution_date <= execution_date_lte)
-
     # filter end date
     if end_date_gte:
         query = query.filter(DagRun.end_date >= end_date_gte)
-
     if end_date_lte:
         query = query.filter(DagRun.end_date <= end_date_lte)
+    return query
 
-    # apply offset and limit
-    dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-    total_entries = session.query(func.count(DagRun.id)).scalar()
-
-    return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-                                                          total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@format_parameters({
+    'start_date_gte': format_datetime,
+    'start_date_lte': format_datetime,
+    'execution_date_gte': format_datetime,
+    'execution_date_lte': format_datetime,
+    'end_date_gte': format_datetime,
+    'end_date_lte': format_datetime,
+})
+@provide_session
+def get_dag_runs_batch(session, dag_ids, start_date_gte=None, start_date_lte=None,
+                       execution_date_gte=None, execution_date_lte=None,
+                       end_date_gte=None, end_date_lte=None, offset=None, limit=None):
     """
     Get list of DAG Runs
     """

Review comment:
       body = request.get_json()
       try:
           data = list_dag_runs_form_schema.load(body)
       except ValidationError as err:
           raise BadRequest(detail=err.messages)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: WIP : API Endpoint - Batch

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r446697286



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -346,6 +346,55 @@ def test_end_date_gte_lte(self, url, expected_dag_run_ids, session):
         self.assertEqual(dag_run_ids, expected_dag_run_ids)
 
 
+class TestGetDagRunBatch(TestDagRunEndpoint):
+    @provide_session
+    def test_should_respond_200(self, session):
+        dag_runs = self._create_test_dag_run()
+        session.add_all(dag_runs)
+        session.commit()
+        payload = {
+            "page_offset": 0,
+            "page_limit": 10,
+            "dag_ids": ["TEST_DAG_I"],
+            "execution_date_gte": "2020-06-24T19:54:56Z",
+            "execution_date_lte": self.default_time,
+            "start_date_gte": "2020-06-24T19:54:56Z",
+            "start_date_lte": self.default_time,
+            "end_date_gte": "2020-06-24T19:54:56Z",
+            "end_date_lte": self.default_time
+        }
+        response = self.client.post("api/v1/dags/~/dagRuns/list", json=payload)
+        assert response.status_code == 200
+        self.assertEqual(
+            response.json,
+            {
+                "dag_runs": [
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_1',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                    {
+                        'dag_id': 'TEST_DAG_ID',
+                        'dag_run_id': 'TEST_DAG_RUN_ID_2',
+                        'end_date': None,
+                        'state': 'running',
+                        'execution_date': self.default_time_2,
+                        'external_trigger': True,
+                        'start_date': self.default_time,
+                        'conf': {},
+                    },
+                ],
+                "total_entries": 2,
+            }
+        )
+
+

Review comment:
       I would suggest you add more tests. Like testing the page limit and offset, the execution date etc. You may use parameterized for the tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org