You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/07 18:42:49 UTC

[GitHub] [airflow] bbovenzi opened a new pull request, #26942: Filtering datasets by recent update events

bbovenzi opened a new pull request, #26942:
URL: https://github.com/apache/airflow/pull/26942

   Add the ability to filter datasets by when the last update events were (eg: only show datasets that had events in the past 7 days)
   
   <img width="587" alt="Screen Shot 2022-10-07 at 2 42 40 PM" src="https://user-images.githubusercontent.com/4600967/194627505-73ff1777-bedb-42d2-91a7-6124c592a5ed.png">
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992640772


##########
airflow/www/views.py:
##########
@@ -3574,13 +3586,27 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if updated_before or updated_after:
+                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

Review Comment:
   For the unfiltered query we want outer joins.
   
   But for the filtered results query and for the filtered count query, doing an outer join and then filtering the results later in a WHERE clause is effectively an inner join, right?
   
   I (possibly naively!) would expect that most query planners will properly optimize that into an inner join, so the performance difference should be nothing or negligible. And performance is always such a fickle thing to preemptively plan for and measure after the fact.
   
   Part of me thinks that we keep it as similar to the results query as possible, and use outer joins and filter it later on.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991681309


##########
airflow/www/views.py:
##########
@@ -3535,7 +3544,25 @@ def datasets_summary(self):
                 )
             }, 400
 
-        limit = 50 if limit > 50 else limit
+        updated_after = None
+        if untrusted_updated_after:
+            # Try to figure out how other functions in this module safely parse datetimes submitted by users
+            # and do the same thing here
+            updated_after = _safe_parse_datetime(untrusted_updated_after)
+        updated_before = None
+        if untrusted_updated_before:
+            # Clean this data the same way you cleaned updated_after
+            updated_before = _safe_parse_datetime(untrusted_updated_before)
+
+        # split_with_any_tags = []
+        # if isinstance(tags, str):
+        #     split_with_any_tags = with_any_tags.split(",") if "," in with_any_tags else [with_any_tags]
+        # else:
+        #     return {
+        #         "detail": (
+        #             f"The with_any_tags query parameter must be a string, or comma-separated strings"
+        #         )
+        #     }, 400

Review Comment:
   Yep, we should either remove this or implement it. @bbovenzi You mind making that call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992756303


##########
airflow/www/views.py:
##########
@@ -3574,23 +3574,28 @@ def datasets_summary(self):
 
             count_query = session.query(func.count(DatasetModel.id))
 
+            has_event_filters = bool(updated_before or updated_after)
+
             query = (
                 session.query(
                     DatasetModel.id,
                     DatasetModel.uri,
                     func.max(DatasetEvent.timestamp).label("last_dataset_update"),
-                    func.count(distinct(DatasetEvent.id)).label("total_updates"),
+                    func.count(func.sum(func.case(DatasetEvent.id.is_not(None), 1, else_=0))).label(
+                        "total_updates"
+                    ),
                 )
-                .outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+                .join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id, isouter=not has_event_filters)

Review Comment:
   and with event filters we do inner join, so that no dataset returned when no event subject to filters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992641426


##########
airflow/www/views.py:
##########
@@ -3559,11 +3572,10 @@ def datasets_summary(self):
                     if session.bind.dialect.name == "postgresql":
                         order_by = (order_by[0].nulls_first(), *order_by[1:])
 
-            total_entries = session.query(func.count(DatasetModel.id)).scalar()
+            count_query = session.query(func.count(DatasetModel.id))

Review Comment:
   @bbovenzi For an example, see how we count the number of dataset events in the results query above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991573178


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp >= updated_after)
+                query = query.filter(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp <= updated_before)
+                query = query.filter(DatasetEvent.timestamp <= updated_before)

Review Comment:
   I like the second option - seems cleaner to me. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991572978


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(

Review Comment:
   Good call - yep I think we want an inner join in there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi merged pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
bbovenzi merged PR #26942:
URL: https://github.com/apache/airflow/pull/26942


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992596163


##########
airflow/www/views.py:
##########
@@ -3574,13 +3586,27 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if updated_before or updated_after:
+                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

Review Comment:
   Isn't that what you suggested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r990718192


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp >= updated_after)
+                query = query.filter(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp <= updated_before)
+                query = query.filter(DatasetEvent.timestamp <= updated_before)

Review Comment:
   if user provides both `before` and `after` filters, it looks like you would add two joins to the same table but you only need one.  you could do something like this instead.
   
   ```suggestion
               if uri_pattern:
                   count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                   query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
   
               if updated_after:
                   count_query = count_query.outerjoin(
                       DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
                   ).filter(DatasetEvent.timestamp >= updated_after)
                   query = query.filter(DatasetEvent.timestamp >= updated_after)
               if updated_before:
                   count_query = count_query.outerjoin(
                       DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
                   ).filter(DatasetEvent.timestamp <= updated_before)
                   query = query.filter(DatasetEvent.timestamp <= updated_before)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992679921


##########
airflow/www/views.py:
##########
@@ -3574,13 +3586,27 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if updated_before or updated_after:
+                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

Review Comment:
   > Isn't that what you suggested?
   
   I just pointed out that it was sort of ambiguous what the intention since using outer join in combination with those filters effectively makes it an inner join.
   
   My the question I'm highlighting is about logic not necessarily performance.  The question is, if a dataset has no events in the time range, do you still want it to appear in the result?  If that's true you need to do an outer join but move the filters to join condition.
   
   I can hop on a call if nec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991579757


##########
airflow/www/views.py:
##########
@@ -3535,7 +3544,25 @@ def datasets_summary(self):
                 )
             }, 400
 
-        limit = 50 if limit > 50 else limit
+        updated_after = None
+        if untrusted_updated_after:
+            # Try to figure out how other functions in this module safely parse datetimes submitted by users
+            # and do the same thing here
+            updated_after = _safe_parse_datetime(untrusted_updated_after)
+        updated_before = None
+        if untrusted_updated_before:
+            # Clean this data the same way you cleaned updated_after
+            updated_before = _safe_parse_datetime(untrusted_updated_before)
+

Review Comment:
   It occurred to me you could remove this block by updating `_safe_parse_datetime` to be something like this:
   
   ```
   def _safe_parse_datetime(v: str, strict=True):
       """
       Parse datetime and return error message for invalid dates
   
       :param v: the string value to be parsed
       :param strict:
       """
       if strict is False:
           if not v:
               return None
       try:
           return timezone.parse(v)
       except (TypeError, ParserError):
           abort(400, f"Invalid datetime: {v!r}")
   ```
   
   wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991585352


##########
airflow/www/views.py:
##########
@@ -3535,7 +3544,25 @@ def datasets_summary(self):
                 )
             }, 400
 
-        limit = 50 if limit > 50 else limit
+        updated_after = None
+        if untrusted_updated_after:
+            # Try to figure out how other functions in this module safely parse datetimes submitted by users
+            # and do the same thing here
+            updated_after = _safe_parse_datetime(untrusted_updated_after)
+        updated_before = None
+        if untrusted_updated_before:
+            # Clean this data the same way you cleaned updated_after
+            updated_before = _safe_parse_datetime(untrusted_updated_before)
+
+        # split_with_any_tags = []
+        # if isinstance(tags, str):
+        #     split_with_any_tags = with_any_tags.split(",") if "," in with_any_tags else [with_any_tags]
+        # else:
+        #     return {
+        #         "detail": (
+        #             f"The with_any_tags query parameter must be a string, or comma-separated strings"
+        #         )
+        #     }, 400

Review Comment:
   not sure if there's maybe a plan for this bit but as a general rule we don't commit commented code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991727931


##########
airflow/www/views.py:
##########
@@ -3535,7 +3544,25 @@ def datasets_summary(self):
                 )
             }, 400
 
-        limit = 50 if limit > 50 else limit
+        updated_after = None
+        if untrusted_updated_after:
+            # Try to figure out how other functions in this module safely parse datetimes submitted by users
+            # and do the same thing here
+            updated_after = _safe_parse_datetime(untrusted_updated_after)
+        updated_before = None
+        if untrusted_updated_before:
+            # Clean this data the same way you cleaned updated_after
+            updated_before = _safe_parse_datetime(untrusted_updated_before)
+

Review Comment:
   I don't have a strong opinion about the naming... it's a "private" method anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r990717913


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(

Review Comment:
   adding these filters presents an ambiguity.
   
   do you still return all the datasets, but the filters only apply to the counts?
   or, do you just return the datasets with updates in the time range.
   
   personally i think the more intuitive behavior is to return only those datasets with updates in the period.  but, i guess this is just an internal API so really we have to look at why we're adding this and how it's gonna be used.
   
   so in short, are you sure you want a left join here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r990718192


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp >= updated_after)
+                query = query.filter(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp <= updated_before)
+                query = query.filter(DatasetEvent.timestamp <= updated_before)

Review Comment:
   if user provides both `before` and `after` filters, it looks like you would add two joins to the same table but you only need one.  or maybe sqlalchemy is smart enough to add only one?  if not, you could avoid by doing something like this instead.
   
   ```suggestion
               if uri_pattern:
                   count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                   query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
   
               if updated_after:
                   count_query = count_query.outerjoin(
                       DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
                   ).filter(DatasetEvent.timestamp >= updated_after)
                   query = query.filter(DatasetEvent.timestamp >= updated_after)
               if updated_before:
                   count_query = count_query.outerjoin(
                       DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
                   ).filter(DatasetEvent.timestamp <= updated_before)
                   query = query.filter(DatasetEvent.timestamp <= updated_before)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992596648


##########
airflow/www/views.py:
##########
@@ -3574,13 +3586,27 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if updated_before or updated_after:
+                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+            filters = []
+            if uri_pattern:
+                filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+            if updated_after:
+                filters.append(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                filters.append(DatasetEvent.timestamp <= updated_before)
+
+            for filter in filters:
+                query = query.filter(filter)
+                count_query = count_query.filter(filter)

Review Comment:
   Doh, of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992757052


##########
airflow/www/views.py:
##########
@@ -3574,23 +3574,28 @@ def datasets_summary(self):
 
             count_query = session.query(func.count(DatasetModel.id))
 
+            has_event_filters = bool(updated_before or updated_after)
+
             query = (
                 session.query(
                     DatasetModel.id,
                     DatasetModel.uri,
                     func.max(DatasetEvent.timestamp).label("last_dataset_update"),
-                    func.count(distinct(DatasetEvent.id)).label("total_updates"),
+                    func.count(func.sum(func.case(DatasetEvent.id.is_not(None), 1, else_=0))).label(
+                        "total_updates"
+                    ),
                 )
-                .outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+                .join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id, isouter=not has_event_filters)
                 .group_by(
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
                 .order_by(*order_by)
             )
 
-            if updated_before or updated_after:
-                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+            if has_event_filters:
+                count_query = count_query.join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

Review Comment:
   if event filters, need to join, and inner is fine.   if not, we don't need to join because it's just counting datasets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992755416


##########
airflow/www/views.py:
##########
@@ -3574,23 +3574,28 @@ def datasets_summary(self):
 
             count_query = session.query(func.count(DatasetModel.id))
 
+            has_event_filters = bool(updated_before or updated_after)
+
             query = (
                 session.query(
                     DatasetModel.id,
                     DatasetModel.uri,
                     func.max(DatasetEvent.timestamp).label("last_dataset_update"),
-                    func.count(distinct(DatasetEvent.id)).label("total_updates"),
+                    func.count(func.sum(func.case(DatasetEvent.id.is_not(None), 1, else_=0))).label(

Review Comment:
   when no event filters, we do a left join, so event may be null, though dataset exists, so total updates should be _zero_, hence this case when



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992495172


##########
airflow/www/views.py:
##########
@@ -3559,11 +3572,10 @@ def datasets_summary(self):
                     if session.bind.dialect.name == "postgresql":
                         order_by = (order_by[0].nulls_first(), *order_by[1:])
 
-            total_entries = session.query(func.count(DatasetModel.id)).scalar()
+            count_query = session.query(func.count(DatasetModel.id))

Review Comment:
   since you are not doing a group by in the count query, you need to do count distinct



##########
airflow/www/views.py:
##########
@@ -3574,13 +3586,27 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if updated_before or updated_after:
+                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+            filters = []
+            if uri_pattern:
+                filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+            if updated_after:
+                filters.append(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                filters.append(DatasetEvent.timestamp <= updated_before)
+
+            for filter in filters:
+                query = query.filter(filter)
+                count_query = count_query.filter(filter)

Review Comment:
   ```suggestion
               query = query.filter(*filters)
               count_query = count_query.filter(*filters)
   ```



##########
airflow/www/views.py:
##########
@@ -3574,13 +3586,27 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if updated_before or updated_after:
+                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

Review Comment:
   are you sure you want outerjoin?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991685663


##########
airflow/www/views.py:
##########
@@ -3535,7 +3544,25 @@ def datasets_summary(self):
                 )
             }, 400
 
-        limit = 50 if limit > 50 else limit
+        updated_after = None
+        if untrusted_updated_after:
+            # Try to figure out how other functions in this module safely parse datetimes submitted by users
+            # and do the same thing here
+            updated_after = _safe_parse_datetime(untrusted_updated_after)
+        updated_before = None
+        if untrusted_updated_before:
+            # Clean this data the same way you cleaned updated_after
+            updated_before = _safe_parse_datetime(untrusted_updated_before)
+

Review Comment:
   I like the concept, although I like the semantics of the term "force" better than I like "strict" (because strict could also apply to how a datetime string is parsed):
   
   ```python
   def _safe_parse_datetime(v: str, force=True):
       """
       Parse datetime and return error message for invalid dates
   
       :param v: the string value to be parsed
       :param force: If True, force the value to be parseable or raise an exception.
                     If False, return None if v is False-y, otherwise parse or raise an exception.
                     Defaults to True.
       """
       if not force:
           if not v:
               return None
       try:
           return timezone.parse(v)
       except (TypeError, ParserError):
           abort(400, f"Invalid datetime: {v!r}")
   ```
   
   But I could also see the argument that it's not this function's job to handle a False-y value parameter.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992755675


##########
airflow/www/views.py:
##########
@@ -3574,23 +3574,28 @@ def datasets_summary(self):
 
             count_query = session.query(func.count(DatasetModel.id))
 
+            has_event_filters = bool(updated_before or updated_after)
+
             query = (
                 session.query(
                     DatasetModel.id,
                     DatasetModel.uri,
                     func.max(DatasetEvent.timestamp).label("last_dataset_update"),
-                    func.count(distinct(DatasetEvent.id)).label("total_updates"),
+                    func.count(func.sum(func.case(DatasetEvent.id.is_not(None), 1, else_=0))).label(
+                        "total_updates"
+                    ),
                 )
-                .outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+                .join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id, isouter=not has_event_filters)

Review Comment:
   when no event filters we do an outer join



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r990718470


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp >= updated_after)
+                query = query.filter(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp <= updated_before)
+                query = query.filter(DatasetEvent.timestamp <= updated_before)

Review Comment:
   Or this I think would also work and is a bit cleaner:
   
   ```suggestion
               if updated_before or updated_after:
                   count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
   
               filters = []
   
               if uri_pattern:
                   filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
   
               if updated_after:
                   filters.append(DatasetEvent.timestamp >= updated_after)
   
               if updated_before:
                   filters.append(DatasetEvent.timestamp <= updated_before)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r990719267


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(

Review Comment:
   and... if you don't want that behavior, then you'd want to move the filter from where clause to join condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991650226


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp >= updated_after)
+                query = query.filter(DatasetEvent.timestamp >= updated_after)
+            if updated_before:
+                count_query = count_query.outerjoin(
+                    DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id
+                ).filter(DatasetEvent.timestamp <= updated_before)
+                query = query.filter(DatasetEvent.timestamp <= updated_before)

Review Comment:
   @bbovenzi in the last commit you pushed, it seems that now you don't apply the filters to both queries



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r990719051


##########
airflow/www/views.py:
##########
@@ -3574,13 +3600,32 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if uri_pattern:
+                count_query = count_query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+                query = query.filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
+
+            if updated_after:
+                count_query = count_query.outerjoin(

Review Comment:
   er, actually, now that i think of it, if the user is providing before or after, then it will in effect be an inner join anyway, so may as well make it an inner join here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r991681619


##########
airflow/www/views.py:
##########
@@ -3535,7 +3544,25 @@ def datasets_summary(self):
                 )
             }, 400
 
-        limit = 50 if limit > 50 else limit
+        updated_after = None
+        if untrusted_updated_after:
+            # Try to figure out how other functions in this module safely parse datetimes submitted by users
+            # and do the same thing here
+            updated_after = _safe_parse_datetime(untrusted_updated_after)
+        updated_before = None
+        if untrusted_updated_before:
+            # Clean this data the same way you cleaned updated_after
+            updated_before = _safe_parse_datetime(untrusted_updated_before)
+
+        # split_with_any_tags = []
+        # if isinstance(tags, str):
+        #     split_with_any_tags = with_any_tags.split(",") if "," in with_any_tags else [with_any_tags]
+        # else:
+        #     return {
+        #         "detail": (
+        #             f"The with_any_tags query parameter must be a string, or comma-separated strings"
+        #         )
+        #     }, 400

Review Comment:
   Oops, I should have checked before writing - this is resolved. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #26942: Filtering datasets by recent update events

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #26942:
URL: https://github.com/apache/airflow/pull/26942#discussion_r992707538


##########
airflow/www/views.py:
##########
@@ -3574,13 +3586,27 @@ def datasets_summary(self):
                     DatasetModel.id,
                     DatasetModel.uri,
                 )
-                .filter(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
                 .order_by(*order_by)
-                .offset(offset)
-                .limit(limit)
-                .all()
-            ]
-            data = {"datasets": datasets, "total_entries": total_entries}
+            )
+
+            if updated_before or updated_after:
+                count_query = count_query.outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)

Review Comment:
   We only want datasets with an event in the time range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org