You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:18 UTC

[airflow] 02/41: Add last_updated_at and total_updates to datasets list view (#26358)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 48ce0eaca286f80e1c7b01ee957b877cfa89f27f
Author: blag <bl...@users.noreply.github.com>
AuthorDate: Wed Oct 5 06:51:09 2022 -0700

    Add last_updated_at and total_updates to datasets list view (#26358)
    
    * wip
    
    * add update info to datasets list in UI
    
    * wip
    
    * wip
    
    * Cleanup
    
    * Fixups and cleanups
    
    * Add support for MySQL sorting by null first/last
    
    * Add support for MSSQL sorting by null first/last
    
    * Remove unnecessary extras
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    
    * Clear dataset-related objects before and after tests
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    
    * Revert using single letters for datasets
    
    * Remove self.default_time and its uses
    
    * Betterize fixture creation
    
    * Remove comment
    
    * Add __future__.annotations import
    
    * Black formatting
    
    * Sort nulls last, using normal SQL rules
    
    * Fixup: code and tests
    
    * Add missing import
    
    * Expand tests to try to get more dataset events than correct
    
    * Fix aggregate counts for datasets with multiple producing task or multiple consuming dags
    
    * Fix COUNT DISTINCT for MySQL and MSSQL
    
    * change last update to a sortable column
    
    * server-side sorting
    
    * Entirely remove producing_task_count and consuming_dag_count from the query and response
    
    * Remove some now-unnecessary outer joins
    
    * Rename endpoint to the more accurate datasets_summary
    
    * Rename dataset summary function to match endpoint
    
    * fixup
    
    Co-authored-by: Brent Bovenzi <br...@gmail.com>
    
    Co-authored-by: Jed Cunningham <je...@apache.org>
    Co-authored-by: Brent Bovenzi <br...@gmail.com>
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    (cherry picked from commit d600cbd5fe9f0cd7101ff572190a8e5e64110e22)
---
 airflow/www/static/js/api/useDataset.ts     |   8 +-
 airflow/www/static/js/api/useDatasets.ts    |   6 +-
 airflow/www/static/js/datasets/List.tsx     |  47 +++-
 airflow/www/static/js/types/index.ts        |   6 +
 airflow/www/templates/airflow/datasets.html |   3 +-
 airflow/www/views.py                        |  76 ++++++-
 tests/www/views/test_views_dataset.py       | 332 ++++++++++++++++++++++++++++
 7 files changed, 464 insertions(+), 14 deletions(-)

diff --git a/airflow/www/static/js/api/useDataset.ts b/airflow/www/static/js/api/useDataset.ts
index 900bdbc2d1..4633b47242 100644
--- a/airflow/www/static/js/api/useDataset.ts
+++ b/airflow/www/static/js/api/useDataset.ts
@@ -23,11 +23,15 @@ import { useQuery } from 'react-query';
 import { getMetaValue } from 'src/utils';
 import type { API } from 'src/types';
 
-export default function useDataset({ uri }: API.GetDatasetVariables) {
+interface Props {
+  uri: string;
+}
+
+export default function useDataset({ uri }: Props) {
   return useQuery(
     ['dataset', uri],
     () => {
-      const datasetUrl = `${getMetaValue('datasets_api') || '/api/v1/datasets'}/${encodeURIComponent(uri)}`;
+      const datasetUrl = getMetaValue('dataset_api').replace('__URI__', encodeURIComponent(uri));
       return axios.get<AxiosResponse, API.Dataset>(datasetUrl);
     },
   );
diff --git a/airflow/www/static/js/api/useDatasets.ts b/airflow/www/static/js/api/useDatasets.ts
index 34e5bd93ec..ae92ffacbd 100644
--- a/airflow/www/static/js/api/useDatasets.ts
+++ b/airflow/www/static/js/api/useDatasets.ts
@@ -21,10 +21,10 @@ import axios, { AxiosResponse } from 'axios';
 import { useQuery } from 'react-query';
 
 import { getMetaValue } from 'src/utils';
-import type { API } from 'src/types';
+import type { DatasetListItem } from 'src/types';
 
 interface DatasetsData {
-  datasets: API.Dataset[];
+  datasets: DatasetListItem[];
   totalEntries: number;
 }
 
@@ -41,7 +41,7 @@ export default function useDatasets({
   const query = useQuery(
     ['datasets', limit, offset, order, uri],
     () => {
-      const datasetsUrl = getMetaValue('datasets_api') || '/api/v1/datasets';
+      const datasetsUrl = getMetaValue('datasets_api');
       const orderParam = order ? { order_by: order } : {};
       const uriParam = uri ? { uri_pattern: uri } : {};
       return axios.get<AxiosResponse, DatasetsData>(
diff --git a/airflow/www/static/js/datasets/List.tsx b/airflow/www/static/js/datasets/List.tsx
index 8e38cbefbd..8ac1bb7622 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -25,33 +25,66 @@ import {
   Text,
   Link,
 } from '@chakra-ui/react';
-import { snakeCase } from 'lodash';
 import type { Row, SortingRule } from 'react-table';
 
 import { useDatasets } from 'src/api';
-import { Table } from 'src/components/Table';
+import { Table, TimeCell } from 'src/components/Table';
 import type { API } from 'src/types';
 import { getMetaValue } from 'src/utils';
+import { snakeCase } from 'lodash';
 
 interface Props {
   onSelect: (datasetId: string) => void;
 }
 
+interface CellProps {
+  cell: {
+    value: any;
+    row: {
+      original: Record<string, any>;
+    }
+  }
+}
+
+const DetailCell = ({ cell: { row } }: CellProps) => {
+  const { totalUpdates, uri } = row.original;
+  return (
+    <Box>
+      <Text>{uri}</Text>
+      <Text fontSize="sm" mt={2}>
+        Total Updates:
+        {' '}
+        {totalUpdates}
+      </Text>
+    </Box>
+  );
+};
+
 const DatasetsList = ({ onSelect }: Props) => {
   const limit = 25;
   const [offset, setOffset] = useState(0);
-  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([]);
+  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 'lastDatasetUpdate', desc: true }]);
 
   const sort = sortBy[0];
   const order = sort ? `${sort.desc ? '-' : ''}${snakeCase(sort.id)}` : '';
 
-  const { data: { datasets, totalEntries }, isLoading } = useDatasets({ limit, offset, order });
+  const { data: { datasets, totalEntries }, isLoading } = useDatasets({
+    limit,
+    offset,
+    order,
+  });
 
   const columns = useMemo(
     () => [
       {
         Header: 'URI',
         accessor: 'uri',
+        Cell: DetailCell,
+      },
+      {
+        Header: 'Last Update',
+        accessor: 'lastDatasetUpdate',
+        Cell: TimeCell,
       },
     ],
     [],
@@ -61,6 +94,7 @@ const DatasetsList = ({ onSelect }: Props) => {
     () => datasets,
     [datasets],
   );
+  const memoSort = useMemo(() => sortBy, [sortBy]);
 
   const onDatasetSelect = (row: Row<API.Dataset>) => {
     if (row.original.uri) onSelect(row.original.uri);
@@ -76,7 +110,7 @@ const DatasetsList = ({ onSelect }: Props) => {
         </Heading>
       </Flex>
       {!datasets.length && !isLoading && (
-        <Text>
+        <Text mb={4}>
           Looks like you do not have any datasets yet. Check out the
           {' '}
           <Link color="blue" href={docsUrl} isExternal>docs</Link>
@@ -94,11 +128,12 @@ const DatasetsList = ({ onSelect }: Props) => {
             setOffset,
             totalEntries,
           }}
-          pageSize={limit}
           manualSort={{
             setSortBy,
             sortBy,
+            initialSortBy: memoSort,
           }}
+          pageSize={limit}
           onRowClicked={onDatasetSelect}
         />
       </Box>
diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts
index d1386b8ee1..bf0f7d9b9c 100644
--- a/airflow/www/static/js/types/index.ts
+++ b/airflow/www/static/js/types/index.ts
@@ -97,6 +97,11 @@ interface DepEdge {
   v: string;
 }
 
+interface DatasetListItem extends API.Dataset {
+  lastDatasetUpdate: string | null;
+  totalUpdates: number;
+}
+
 export type {
   Dag,
   DagRun,
@@ -108,4 +113,5 @@ export type {
   DepEdge,
   API,
   RunOrdering,
+  DatasetListItem,
 };
diff --git a/airflow/www/templates/airflow/datasets.html b/airflow/www/templates/airflow/datasets.html
index 36c7a9b689..6b3555c616 100644
--- a/airflow/www/templates/airflow/datasets.html
+++ b/airflow/www/templates/airflow/datasets.html
@@ -22,7 +22,8 @@
 
 {% block head_meta %}
   {{ super() }}
-  <meta name="datasets_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_datasets') }}">
+  <meta name="datasets_api" content="{{ url_for('Airflow.datasets_summary') }}">
+  <meta name="dataset_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_dataset', uri='__URI__') }}">
   <meta name="dataset_events_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_dataset_events') }}">
   <meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id='__DAG_ID__') }}">
   <meta name="datasets_docs" content="{{ get_docs_url('concepts/datasets.html') }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b1d3c1209b..3fe94e0fd9 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -71,7 +71,7 @@ from pendulum.datetime import DateTime
 from pendulum.parsing.exceptions import ParserError
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
-from sqlalchemy import Date, and_, desc, func, inspect, union_all
+from sqlalchemy import Date, and_, desc, distinct, func, inspect, union_all
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.orm import Session, joinedload
 from wtforms import SelectField, validators
@@ -108,7 +108,7 @@ from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
 from airflow.timetables.base import DataInterval, TimeRestriction
 from airflow.timetables.interval import CronDataIntervalTimetable
-from airflow.utils import timezone, yaml
+from airflow.utils import json as utils_json, timezone, yaml
 from airflow.utils.airflow_flask_app import get_airflow_app
 from airflow.utils.dag_edges import dag_edges
 from airflow.utils.dates import infer_time_unit, scale_time_units
@@ -3547,6 +3547,78 @@ class Airflow(AirflowBaseView):
             {'Content-Type': 'application/json; charset=utf-8'},
         )
 
+    @expose('/object/datasets_summary')
+    @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET)])
+    def datasets_summary(self):
+        """Get a summary of datasets, including the datetime they were last updated and how many updates
+        they've ever had
+        """
+        allowed_attrs = ['uri', 'last_dataset_update']
+
+        limit = int(request.args.get("limit", 25))
+        offset = int(request.args.get("offset", 0))
+        order_by = request.args.get("order_by", "uri")
+        lstripped_orderby = order_by.lstrip('-')
+
+        if lstripped_orderby not in allowed_attrs:
+            return {
+                "detail": (
+                    f"Ordering with '{lstripped_orderby}' is disallowed or the attribute does not "
+                    "exist on the model"
+                )
+            }, 400
+
+        limit = 50 if limit > 50 else limit
+
+        with create_session() as session:
+            if lstripped_orderby == "uri":
+                if order_by[0] == "-":
+                    order_by = (DatasetModel.uri.desc(),)
+                else:
+                    order_by = (DatasetModel.uri.asc(),)
+            elif lstripped_orderby == "last_dataset_update":
+                if order_by[0] == "-":
+                    order_by = (
+                        func.max(DatasetEvent.timestamp).desc(),
+                        DatasetModel.uri.asc(),
+                    )
+                    if session.bind.dialect.name == "postgresql":
+                        order_by = (order_by[0].nulls_last(), *order_by[1:])
+                else:
+                    order_by = (
+                        func.max(DatasetEvent.timestamp).asc(),
+                        DatasetModel.uri.desc(),
+                    )
+                    if session.bind.dialect.name == "postgresql":
+                        order_by = (order_by[0].nulls_first(), *order_by[1:])
+
+            total_entries = session.query(func.count(DatasetModel.id)).scalar()
+
+            datasets = [
+                dict(dataset)
+                for dataset in session.query(
+                    DatasetModel.id,
+                    DatasetModel.uri,
+                    func.max(DatasetEvent.timestamp).label("last_dataset_update"),
+                    func.count(distinct(DatasetEvent.id)).label("total_updates"),
+                )
+                .outerjoin(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
+                .group_by(
+                    DatasetModel.id,
+                    DatasetModel.uri,
+                )
+                .order_by(*order_by)
+                .offset(offset)
+                .limit(limit)
+                .all()
+            ]
+            data = {"datasets": datasets, "total_entries": total_entries}
+
+            return (
+                htmlsafe_json_dumps(data, separators=(',', ':'), cls=utils_json.AirflowJsonEncoder),
+                {'Content-Type': 'application/json; charset=utf-8'},
+            )
+
     @expose('/robots.txt')
     @action_logging
     def robots(self):
diff --git a/tests/www/views/test_views_dataset.py b/tests/www/views/test_views_dataset.py
new file mode 100644
index 0000000000..c67298876d
--- /dev/null
+++ b/tests/www/views/test_views_dataset.py
@@ -0,0 +1,332 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pendulum
+import pytest
+from dateutil.tz import UTC
+
+from airflow import Dataset
+from airflow.models.dataset import DatasetEvent, DatasetModel
+from airflow.operators.empty import EmptyOperator
+from tests.test_utils.asserts import assert_queries_count
+from tests.test_utils.db import clear_db_datasets
+
+
+class TestDatasetEndpoint:
+    @pytest.fixture(autouse=True)
+    def cleanup(self):
+        clear_db_datasets()
+        yield
+        clear_db_datasets()
+
+
+class TestGetDatasets(TestDatasetEndpoint):
+    def test_should_respond_200(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        assert session.query(DatasetModel).count() == 2
+
+        with assert_queries_count(8):
+            response = admin_client.get("/object/datasets_summary")
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "datasets": [
+                {
+                    "id": 1,
+                    "uri": "s3://bucket/key/1",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+                {
+                    "id": 2,
+                    "uri": "s3://bucket/key/2",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+            ],
+            "total_entries": 2,
+        }
+
+    def test_order_by_raises_400_for_invalid_attr(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        assert session.query(DatasetModel).count() == 2
+
+        response = admin_client.get("/object/datasets_summary?order_by=fake")
+
+        assert response.status_code == 400
+        msg = "Ordering with 'fake' is disallowed or the attribute does not exist on the model"
+        assert response.json['detail'] == msg
+
+    @pytest.mark.parametrize(
+        "order_by, ordered_dataset_ids",
+        [
+            ("uri", [1, 2, 3, 4]),
+            ("-uri", [4, 3, 2, 1]),
+            ("last_dataset_update", [4, 1, 3, 2]),
+            ("-last_dataset_update", [2, 3, 1, 4]),
+        ],
+    )
+    def test_order_by(self, admin_client, session, order_by, ordered_dataset_ids):
+        datasets = [
+            DatasetModel(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+            )
+            for i in range(1, len(ordered_dataset_ids) + 1)
+        ]
+        session.add_all(datasets)
+        dataset_events = [
+            DatasetEvent(
+                dataset_id=datasets[2].id,
+                timestamp=pendulum.today('UTC').add(days=-3),
+            ),
+            DatasetEvent(
+                dataset_id=datasets[1].id,
+                timestamp=pendulum.today('UTC').add(days=-2),
+            ),
+            DatasetEvent(
+                dataset_id=datasets[1].id,
+                timestamp=pendulum.today('UTC').add(days=-1),
+            ),
+        ]
+        session.add_all(dataset_events)
+        session.commit()
+        assert session.query(DatasetModel).count() == len(ordered_dataset_ids)
+
+        response = admin_client.get(f"/object/datasets_summary?order_by={order_by}")
+
+        assert response.status_code == 200
+        assert ordered_dataset_ids == [json_dict['id'] for json_dict in response.json['datasets']]
+        assert response.json['total_entries'] == len(ordered_dataset_ids)
+
+    @pytest.mark.need_serialized_dag
+    def test_correct_counts_update(self, admin_client, session, dag_maker, app, monkeypatch):
+        with monkeypatch.context() as m:
+            datasets = [Dataset(uri=f"s3://bucket/key/{i}") for i in [1, 2, 3, 4, 5]]
+
+            # DAG that produces dataset #1
+            with dag_maker(dag_id='upstream', schedule=None, serialized=True, session=session):
+                EmptyOperator(task_id='task1', outlets=[datasets[0]])
+
+            # DAG that is consumes only datasets #1 and #2
+            with dag_maker(dag_id='downstream', schedule=datasets[:2], serialized=True, session=session):
+                EmptyOperator(task_id='task1')
+
+            # We create multiple dataset-producing and dataset-consuming DAGs because the query requires
+            # COUNT(DISTINCT ...) for total_updates, or else it returns a multiple of the correct number due
+            # to the outer joins with DagScheduleDatasetReference and TaskOutletDatasetReference
+            # Two independent DAGs that produce dataset #3
+            with dag_maker(dag_id='independent_producer_1', serialized=True, session=session):
+                EmptyOperator(task_id='task1', outlets=[datasets[2]])
+            with dag_maker(dag_id='independent_producer_2', serialized=True, session=session):
+                EmptyOperator(task_id='task1', outlets=[datasets[2]])
+            # Two independent DAGs that consume dataset #4
+            with dag_maker(
+                dag_id='independent_consumer_1',
+                schedule=[datasets[3]],
+                serialized=True,
+                session=session,
+            ):
+                EmptyOperator(task_id='task1')
+            with dag_maker(
+                dag_id='independent_consumer_2',
+                schedule=[datasets[3]],
+                serialized=True,
+                session=session,
+            ):
+                EmptyOperator(task_id='task1')
+
+            # Independent DAG that is produces and consumes the same dataset, #5
+            with dag_maker(
+                dag_id='independent_producer_self_consumer',
+                schedule=[datasets[4]],
+                serialized=True,
+                session=session,
+            ):
+                EmptyOperator(task_id='task1', outlets=[datasets[4]])
+
+            m.setattr(app, 'dag_bag', dag_maker.dagbag)
+
+            ds1_id = session.query(DatasetModel.id).filter_by(uri=datasets[0].uri).scalar()
+            ds2_id = session.query(DatasetModel.id).filter_by(uri=datasets[1].uri).scalar()
+            ds3_id = session.query(DatasetModel.id).filter_by(uri=datasets[2].uri).scalar()
+            ds4_id = session.query(DatasetModel.id).filter_by(uri=datasets[3].uri).scalar()
+            ds5_id = session.query(DatasetModel.id).filter_by(uri=datasets[4].uri).scalar()
+
+            # dataset 1 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds1_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(3)
+                ]
+            )
+            # dataset 3 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds3_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(3)
+                ]
+            )
+            # dataset 4 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds4_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(4)
+                ]
+            )
+            # dataset 5 events
+            session.add_all(
+                [
+                    DatasetEvent(
+                        dataset_id=ds5_id,
+                        timestamp=pendulum.DateTime(2022, 8, 1, i, tzinfo=UTC),
+                    )
+                    for i in range(5)
+                ]
+            )
+            session.commit()
+
+            response = admin_client.get("/object/datasets_summary")
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "datasets": [
+                {
+                    "id": ds1_id,
+                    "uri": "s3://bucket/key/1",
+                    "last_dataset_update": "2022-08-01T02:00:00+00:00",
+                    "total_updates": 3,
+                },
+                {
+                    "id": ds2_id,
+                    "uri": "s3://bucket/key/2",
+                    "last_dataset_update": None,
+                    "total_updates": 0,
+                },
+                {
+                    "id": ds3_id,
+                    "uri": "s3://bucket/key/3",
+                    "last_dataset_update": "2022-08-01T02:00:00+00:00",
+                    "total_updates": 3,
+                },
+                {
+                    "id": ds4_id,
+                    "uri": "s3://bucket/key/4",
+                    "last_dataset_update": "2022-08-01T03:00:00+00:00",
+                    "total_updates": 4,
+                },
+                {
+                    "id": ds5_id,
+                    "uri": "s3://bucket/key/5",
+                    "last_dataset_update": "2022-08-01T04:00:00+00:00",
+                    "total_updates": 5,
+                },
+            ],
+            "total_entries": 5,
+        }
+
+
+class TestGetDatasetsEndpointPagination(TestDatasetEndpoint):
+    @pytest.mark.parametrize(
+        "url, expected_dataset_uris",
+        [
+            # Limit test data
+            ("/object/datasets_summary?limit=1", ["s3://bucket/key/1"]),
+            ("/object/datasets_summary?limit=5", [f"s3://bucket/key/{i}" for i in range(1, 6)]),
+            # Offset test data
+            ("/object/datasets_summary?offset=1", [f"s3://bucket/key/{i}" for i in range(2, 10)]),
+            ("/object/datasets_summary?offset=3", [f"s3://bucket/key/{i}" for i in range(4, 10)]),
+            # Limit and offset test data
+            ("/object/datasets_summary?offset=3&limit=3", [f"s3://bucket/key/{i}" for i in [4, 5, 6]]),
+        ],
+    )
+    def test_limit_and_offset(self, admin_client, session, url, expected_dataset_uris):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+            )
+            for i in range(1, 10)
+        ]
+        session.add_all(datasets)
+        session.commit()
+
+        response = admin_client.get(url)
+
+        assert response.status_code == 200
+        dataset_uris = [dataset["uri"] for dataset in response.json["datasets"]]
+        assert dataset_uris == expected_dataset_uris
+
+    def test_should_respect_page_size_limit_default(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+            )
+            for i in range(1, 60)
+        ]
+        session.add_all(datasets)
+        session.commit()
+
+        response = admin_client.get("/object/datasets_summary")
+
+        assert response.status_code == 200
+        assert len(response.json['datasets']) == 25
+
+    def test_should_return_max_if_req_above(self, admin_client, session):
+        datasets = [
+            DatasetModel(
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+            )
+            for i in range(1, 60)
+        ]
+        session.add_all(datasets)
+        session.commit()
+
+        response = admin_client.get("/object/datasets_summary?limit=180")
+
+        assert response.status_code == 200
+        assert len(response.json['datasets']) == 50