You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/07/28 09:28:39 UTC
[airflow] branch main updated: List upstream dataset events (#25300)
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b4e55f68f List upstream dataset events (#25300)
8b4e55f68f is described below
commit 8b4e55f68fda1f7f3c77dfa486177733dcfe544e
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Thu Jul 28 10:28:09 2022 +0100
List upstream dataset events (#25300)
* add upstream dataset events to run details
* show only task id link
* fix timestamps and extra field
* fix relative import path
* improve dataset events table
---
airflow/www/static/js/api/index.ts | 2 +
.../www/static/js/api/useUpstreamDatasetEvents.ts | 51 ++++++++++
airflow/www/static/js/components/Table/Cells.tsx | 61 ++++++++++++
.../js/components/{ => Table}/Table.test.tsx | 4 +-
.../js/components/{Table.tsx => Table/index.tsx} | 4 +-
airflow/www/static/js/dag/details/Dag.tsx | 2 +-
.../js/dag/details/dagRun/UpstreamEvents.tsx | 82 ++++++++++++++++
airflow/www/static/js/dag/details/dagRun/index.tsx | 4 +
.../dag/details/taskInstance/MappedInstances.tsx | 2 +-
airflow/www/static/js/datasets/Details.tsx | 103 ++++++++-------------
airflow/www/static/js/datasets/List.tsx | 7 +-
airflow/www/templates/airflow/dag.html | 1 +
12 files changed, 245 insertions(+), 78 deletions(-)
diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts
index 82758738f0..f6ca2e51dd 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -36,6 +36,7 @@ import useMappedInstances from './useMappedInstances';
import useDatasets from './useDatasets';
import useDataset from './useDataset';
import useDatasetEvents from './useDatasetEvents';
+import useUpstreamDatasetEvents from './useUpstreamDatasetEvents';
axios.interceptors.response.use(
(res: AxiosResponse) => (res.data ? camelcaseKeys(res.data, { deep: true }) : res),
@@ -60,4 +61,5 @@ export {
useDatasets,
useDataset,
useDatasetEvents,
+ useUpstreamDatasetEvents,
};
diff --git a/airflow/www/static/js/api/useUpstreamDatasetEvents.ts b/airflow/www/static/js/api/useUpstreamDatasetEvents.ts
new file mode 100644
index 0000000000..4821b4dd5f
--- /dev/null
+++ b/airflow/www/static/js/api/useUpstreamDatasetEvents.ts
@@ -0,0 +1,51 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import axios, { AxiosResponse } from 'axios';
+import { useQuery } from 'react-query';
+
+import { getMetaValue } from 'src/utils';
+import type { API } from 'src/types';
+
+interface Props {
+ runId: string;
+}
+
+interface UpstreamEventsData {
+ datasetEvents: API.DatasetEvent[];
+ totalEntries: number;
+}
+
+export default function useUpstreamDatasetEvents({ runId }: Props) {
+ const query = useQuery(
+ ['upstreamDatasetEvents', runId],
+ () => {
+ const dagId = getMetaValue('dag_id');
+ const upstreamEventsUrl = (
+ getMetaValue('upstream_dataset_events_api')
+ || `api/v1/dags/${dagId}/dagRuns/_DAG_RUN_ID_/upstreamDatasetEvents`
+ ).replace('_DAG_RUN_ID_', runId);
+ return axios.get<AxiosResponse, UpstreamEventsData>(upstreamEventsUrl);
+ },
+ );
+ return {
+ ...query,
+ data: query.data || { datasetEvents: [], totalEntries: 0 },
+ };
+}
diff --git a/airflow/www/static/js/components/Table/Cells.tsx b/airflow/www/static/js/components/Table/Cells.tsx
new file mode 100644
index 0000000000..0c06d9f7dc
--- /dev/null
+++ b/airflow/www/static/js/components/Table/Cells.tsx
@@ -0,0 +1,61 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from 'react';
+import {
+ Code, Link, Box, Text,
+} from '@chakra-ui/react';
+
+import Time from 'src/components/Time';
+
+interface CellProps {
+ cell: {
+ value: any;
+ row: {
+ original: Record<string, any>;
+ }
+ }
+}
+
+export const TimeCell = ({ cell: { value } }: CellProps) => <Time dateTime={value} />;
+
+export const DatasetLink = ({ cell: { value, row } }: CellProps) => (
+ <Link
+ color="blue.600"
+ href={`/datasets?dataset_id=${row.original.datasetId}`}
+ >
+ {value}
+ </Link>
+);
+
+export const TaskInstanceLink = ({ cell: { value, row } }: CellProps) => {
+ const { sourceRunId, sourceDagId, sourceMapIndex } = row.original;
+ const url = `/dags/${sourceDagId}/grid?dag_run_id=${encodeURIComponent(sourceRunId)}&task_id=${encodeURIComponent(value)}`;
+ const mapIndex = sourceMapIndex > -1 ? `[${sourceMapIndex}]` : '';
+ return (
+ <Box>
+ <Link color="blue.600" href={url}>{`${sourceDagId}.${value}${mapIndex}`}</Link>
+ <Text>{sourceRunId}</Text>
+ </Box>
+ );
+};
+
+export const CodeCell = ({ cell: { value } }: CellProps) => (
+ value ? <Code>{JSON.stringify(value)}</Code> : null
+);
diff --git a/airflow/www/static/js/components/Table.test.tsx b/airflow/www/static/js/components/Table/Table.test.tsx
similarity index 99%
rename from airflow/www/static/js/components/Table.test.tsx
rename to airflow/www/static/js/components/Table/Table.test.tsx
index e7a2f14a9b..79f713ba9f 100644
--- a/airflow/www/static/js/components/Table.test.tsx
+++ b/airflow/www/static/js/components/Table/Table.test.tsx
@@ -25,8 +25,8 @@ import { render, fireEvent, within } from '@testing-library/react';
import { sortBy } from 'lodash';
import type { SortingRule } from 'react-table';
-import Table from './Table';
-import { ChakraWrapper } from '../utils/testUtils';
+import { ChakraWrapper } from 'src/utils/testUtils';
+import { Table } from '.';
const data: Record<string, any>[] = [
{ firstName: 'Lamont', lastName: 'Grimes', country: 'United States' },
diff --git a/airflow/www/static/js/components/Table.tsx b/airflow/www/static/js/components/Table/index.tsx
similarity index 99%
rename from airflow/www/static/js/components/Table.tsx
rename to airflow/www/static/js/components/Table/index.tsx
index b539ed165c..534cc50f61 100644
--- a/airflow/www/static/js/components/Table.tsx
+++ b/airflow/www/static/js/components/Table/index.tsx
@@ -95,7 +95,7 @@ interface TableProps {
onRowClicked?: (row: Row<object>, e: any) => void;
}
-const Table = ({
+export const Table = ({
data,
columns,
manualPagination,
@@ -272,4 +272,4 @@ const Table = ({
);
};
-export default Table;
+export * from './Cells';
diff --git a/airflow/www/static/js/dag/details/Dag.tsx b/airflow/www/static/js/dag/details/Dag.tsx
index b7c4fbcf44..dc227c4bd3 100644
--- a/airflow/www/static/js/dag/details/Dag.tsx
+++ b/airflow/www/static/js/dag/details/Dag.tsx
@@ -46,7 +46,7 @@ const Dag = () => {
const { data: { dagRuns } } = useGridData();
// Build a key/value object of operator counts, the name is hidden inside of t.classRef.className
- const operators: Record<string, any> = {};
+ const operators: Record<string, number> = {};
tasks.forEach((t) => {
if (t?.classRef?.className) {
if (!operators[t.classRef.className]) {
diff --git a/airflow/www/static/js/dag/details/dagRun/UpstreamEvents.tsx b/airflow/www/static/js/dag/details/dagRun/UpstreamEvents.tsx
new file mode 100644
index 0000000000..936f5e945c
--- /dev/null
+++ b/airflow/www/static/js/dag/details/dagRun/UpstreamEvents.tsx
@@ -0,0 +1,82 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import React, { useMemo } from 'react';
+import {
+ Box, Heading, Text,
+} from '@chakra-ui/react';
+
+import {
+ CodeCell, DatasetLink, Table, TaskInstanceLink, TimeCell,
+} from 'src/components/Table';
+import { useUpstreamDatasetEvents } from 'src/api';
+import type { DagRun as DagRunType } from 'src/types';
+
+interface Props {
+ runId: DagRunType['runId'];
+}
+
+const UpstreamEvents = ({ runId }: Props) => {
+ const { data: { datasetEvents }, isLoading } = useUpstreamDatasetEvents({ runId });
+
+ const columns = useMemo(
+ () => [
+ {
+ Header: 'Dataset URI',
+ accessor: 'datasetUri',
+ Cell: DatasetLink,
+ },
+ {
+ Header: 'Source Task Instance',
+ accessor: 'sourceTaskId',
+ Cell: TaskInstanceLink,
+ },
+ {
+ Header: 'Extra',
+ accessor: 'extra',
+ disableSortBy: true,
+ Cell: CodeCell,
+ },
+ {
+ Header: 'When',
+ accessor: 'timestamp',
+ Cell: TimeCell,
+ },
+ ],
+ [],
+ );
+
+ const data = useMemo(
+ () => datasetEvents,
+ [datasetEvents],
+ );
+
+ return (
+ <Box mt={3}>
+ <Heading size="md">Upstream Dataset Events</Heading>
+ <Text>Updates to the upstream datasets since the last dataset-triggered DAG run</Text>
+ <Table
+ data={data}
+ columns={columns}
+ isLoading={isLoading}
+ />
+ </Box>
+ );
+};
+
+export default UpstreamEvents;
diff --git a/airflow/www/static/js/dag/details/dagRun/index.tsx b/airflow/www/static/js/dag/details/dagRun/index.tsx
index 65e1a76a63..98b56aeca4 100644
--- a/airflow/www/static/js/dag/details/dagRun/index.tsx
+++ b/airflow/www/static/js/dag/details/dagRun/index.tsx
@@ -45,6 +45,7 @@ import MarkFailedRun from './MarkFailedRun';
import MarkSuccessRun from './MarkSuccessRun';
import QueueRun from './QueueRun';
import ClearRun from './ClearRun';
+import UpstreamEvents from './UpstreamEvents';
const dagId = getMetaValue('dag_id');
const graphUrl = getMetaValue('graph_url');
@@ -169,6 +170,9 @@ const DagRun = ({ runId }: Props) => {
)}
</Tbody>
</Table>
+ {runType === 'dataset_triggered' && (
+ <UpstreamEvents runId={runId} />
+ )}
</>
);
};
diff --git a/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx b/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx
index 288e757521..8b45a4be2c 100644
--- a/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx
@@ -36,7 +36,7 @@ import { getMetaValue } from 'src/utils';
import { formatDuration, getDuration } from 'src/datetime_utils';
import { useMappedInstances } from 'src/api';
import { SimpleStatus } from 'src/dag/StatusBox';
-import Table from 'src/components/Table';
+import { Table } from 'src/components/Table';
import Time from 'src/components/Time';
const canEdit = getMetaValue('can_edit') === 'True';
diff --git a/airflow/www/static/js/datasets/Details.tsx b/airflow/www/static/js/datasets/Details.tsx
index a99efdd42f..7cc16c101c 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -19,14 +19,16 @@
import React, { useMemo, useState } from 'react';
import {
- Box, Heading, Text, Code, Flex, Spinner, Button, Link,
+ Box, Heading, Text, Code, Flex, Spinner, Button,
} from '@chakra-ui/react';
import { snakeCase } from 'lodash';
import type { SortingRule } from 'react-table';
import Time from 'src/components/Time';
import { useDatasetEvents, useDataset } from 'src/api';
-import Table from 'src/components/Table';
+import {
+ Table, TimeCell, CodeCell, TaskInstanceLink,
+} from 'src/components/Table';
import { ClipboardButton } from 'src/components/Clipboard';
interface Props {
@@ -34,24 +36,10 @@ interface Props {
onBack: () => void;
}
-const TimeCell = ({ cell: { value } }: any) => <Time dateTime={value} />;
-const GridLink = ({ cell: { value } }: any) => <Link color="blue.500" href={`/dags/${value}/grid`}>{value}</Link>;
-const RunLink = ({ cell: { value, row } }: any) => {
- const { sourceDagId } = row.original;
- const url = `/dags/${sourceDagId}/grid?dag_run_id=${encodeURIComponent(value)}`;
- return (<Link color="blue.500" href={url}>{value}</Link>);
-};
-const TaskInstanceLink = ({ cell: { value, row } }: any) => {
- const { sourceRunId, sourceDagId } = row.original;
- const url = `/dags/${sourceDagId}/grid?dag_run_id=${encodeURIComponent(sourceRunId)}&task_id=${encodeURIComponent(value)}`;
- return (<Link color="blue.500" href={url}>{value}</Link>);
-};
-const CodeCell = ({ cell: { value } }: any) => <Code>{value}</Code>;
-
const DatasetDetails = ({ datasetId, onBack }: Props) => {
const limit = 25;
const [offset, setOffset] = useState(0);
- const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 'createdAt', desc: true }]);
+ const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 'timestamp', desc: true }]);
const sort = sortBy[0];
const order = sort ? `${sort.desc ? '-' : ''}${snakeCase(sort.id)}` : '';
@@ -67,36 +55,21 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
const columns = useMemo(
() => [
{
- Header: 'Timestamp',
- accessor: 'timestamp',
- Cell: TimeCell,
- },
- {
- Header: 'Source DAG Id',
- accessor: 'sourceDagId',
- Cell: GridLink,
- },
- {
- Header: 'Source DAG Run Id',
- accessor: 'sourceRunId',
- Cell: RunLink,
- },
- {
- Header: 'Source Task Id',
+ Header: 'Source Task Instance',
accessor: 'sourceTaskId',
Cell: TaskInstanceLink,
},
- {
- Header: 'Source Map Index',
- accessor: 'sourceMapIndex',
- Cell: ({ cell: { value } }) => (value > -1 ? value : null),
- },
{
Header: 'Extra',
accessor: 'extra',
disableSortBy: true,
Cell: CodeCell,
},
+ {
+ Header: 'When',
+ accessor: 'timestamp',
+ Cell: TimeCell,
+ },
],
[],
);
@@ -109,35 +82,33 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
const memoSort = useMemo(() => sortBy, [sortBy]);
return (
- <Box maxWidth="1500px">
- <Flex mt={3} justifyContent="space-between">
- {isLoading && <Spinner display="block" />}
- {!!dataset && (
- <Box>
- <Heading mb={2} fontWeight="normal">
- Dataset:
- {' '}
- {dataset.uri}
- <ClipboardButton value={dataset.uri} iconOnly ml={2} />
- </Heading>
- {!!dataset.extra && (
- <Flex>
- <Text mr={1}>Extra:</Text>
- <Code>{JSON.stringify(dataset.extra)}</Code>
- </Flex>
- )}
- <Flex my={2}>
- <Text mr={1}>Updated At:</Text>
- <Time dateTime={dataset.updatedAt} />
- </Flex>
- <Flex my={2}>
- <Text mr={1}>Created At:</Text>
- <Time dateTime={dataset.createdAt} />
+ <Box mt={[6, 3]} maxWidth="1500px">
+ <Button onClick={onBack}>See all datasets</Button>
+ {isLoading && <Spinner display="block" />}
+ {!!dataset && (
+ <Box>
+ <Heading my={2} fontWeight="normal">
+ Dataset:
+ {' '}
+ {dataset.uri}
+ <ClipboardButton value={dataset.uri} iconOnly ml={2} />
+ </Heading>
+ {!!dataset.extra && (
+ <Flex>
+ <Text mr={1}>Extra:</Text>
+ <Code>{JSON.stringify(dataset.extra)}</Code>
</Flex>
- </Box>
- )}
- <Button onClick={onBack}>See all datasets</Button>
- </Flex>
+ )}
+ <Flex my={2}>
+ <Text mr={1}>Updated At:</Text>
+ <Time dateTime={dataset.updatedAt} />
+ </Flex>
+ <Flex my={2}>
+ <Text mr={1}>Created At:</Text>
+ <Time dateTime={dataset.createdAt} />
+ </Flex>
+ </Box>
+ )}
<Heading size="lg" mt={3} mb={2} fontWeight="normal">Upstream Events</Heading>
<Text>Whenever a DAG has updated this dataset.</Text>
<Table
diff --git a/airflow/www/static/js/datasets/List.tsx b/airflow/www/static/js/datasets/List.tsx
index 99c7e2aabe..ee559bfa5a 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -20,24 +20,19 @@
import React, { useMemo, useState } from 'react';
import {
Box,
- Code,
Heading,
} from '@chakra-ui/react';
import { snakeCase } from 'lodash';
import type { Row, SortingRule } from 'react-table';
import { useDatasets } from 'src/api';
-import Table from 'src/components/Table';
-import Time from 'src/components/Time';
+import { Table, TimeCell, CodeCell } from 'src/components/Table';
import type { API } from 'src/types';
interface Props {
onSelect: (datasetId: string) => void;
}
-const TimeCell = ({ cell: { value } }: any) => <Time dateTime={value} />;
-const CodeCell = ({ cell: { value } }: any) => <Code>{value}</Code>;
-
const DatasetsList = ({ onSelect }: Props) => {
const limit = 25;
const [offset, setOffset] = useState(0);
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index a6f52d269b..bbbbde14e2 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -69,6 +69,7 @@
<meta name="tasks_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_task_endpoint_get_tasks', dag_id=dag.dag_id) }}">
<meta name="mapped_instances_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_mapped_task_instances', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
<meta name="task_log_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_log_endpoint_get_log', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_', task_try_number='-1') }}">
+ <meta name="upstream_dataset_events_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_get_upstream_dataset_events', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_') }}">
<!-- End Urls -->
<meta name="is_paused" content="{{ dag_is_paused }}">
<meta name="csrf_token" content="{{ csrf_token() }}">