You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/07/28 09:28:39 UTC

[airflow] branch main updated: List upstream dataset events (#25300)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b4e55f68f List upstream dataset events (#25300)
8b4e55f68f is described below

commit 8b4e55f68fda1f7f3c77dfa486177733dcfe544e
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Thu Jul 28 10:28:09 2022 +0100

    List upstream dataset events (#25300)
    
    * add upstream dataset events to run details
    
    * show only task id link
    
    * fix timestamps and extra field
    
    * fix relative import path
    
    * improve dataset events table
---
 airflow/www/static/js/api/index.ts                 |   2 +
 .../www/static/js/api/useUpstreamDatasetEvents.ts  |  51 ++++++++++
 airflow/www/static/js/components/Table/Cells.tsx   |  61 ++++++++++++
 .../js/components/{ => Table}/Table.test.tsx       |   4 +-
 .../js/components/{Table.tsx => Table/index.tsx}   |   4 +-
 airflow/www/static/js/dag/details/Dag.tsx          |   2 +-
 .../js/dag/details/dagRun/UpstreamEvents.tsx       |  82 ++++++++++++++++
 airflow/www/static/js/dag/details/dagRun/index.tsx |   4 +
 .../dag/details/taskInstance/MappedInstances.tsx   |   2 +-
 airflow/www/static/js/datasets/Details.tsx         | 103 ++++++++-------------
 airflow/www/static/js/datasets/List.tsx            |   7 +-
 airflow/www/templates/airflow/dag.html             |   1 +
 12 files changed, 245 insertions(+), 78 deletions(-)

diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts
index 82758738f0..f6ca2e51dd 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -36,6 +36,7 @@ import useMappedInstances from './useMappedInstances';
 import useDatasets from './useDatasets';
 import useDataset from './useDataset';
 import useDatasetEvents from './useDatasetEvents';
+import useUpstreamDatasetEvents from './useUpstreamDatasetEvents';
 
 axios.interceptors.response.use(
   (res: AxiosResponse) => (res.data ? camelcaseKeys(res.data, { deep: true }) : res),
@@ -60,4 +61,5 @@ export {
   useDatasets,
   useDataset,
   useDatasetEvents,
+  useUpstreamDatasetEvents,
 };
diff --git a/airflow/www/static/js/api/useUpstreamDatasetEvents.ts b/airflow/www/static/js/api/useUpstreamDatasetEvents.ts
new file mode 100644
index 0000000000..4821b4dd5f
--- /dev/null
+++ b/airflow/www/static/js/api/useUpstreamDatasetEvents.ts
@@ -0,0 +1,51 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import axios, { AxiosResponse } from 'axios';
+import { useQuery } from 'react-query';
+
+import { getMetaValue } from 'src/utils';
+import type { API } from 'src/types';
+
+interface Props {
+  runId: string;
+}
+
+interface UpstreamEventsData {
+  datasetEvents: API.DatasetEvent[];
+  totalEntries: number;
+}
+
+export default function useUpstreamDatasetEvents({ runId }: Props) {
+  const query = useQuery(
+    ['upstreamDatasetEvents', runId],
+    () => {
+      const dagId = getMetaValue('dag_id');
+      const upstreamEventsUrl = (
+        getMetaValue('upstream_dataset_events_api')
+          || `api/v1/dags/${dagId}/dagRuns/_DAG_RUN_ID_/upstreamDatasetEvents`
+      ).replace('_DAG_RUN_ID_', runId);
+      return axios.get<AxiosResponse, UpstreamEventsData>(upstreamEventsUrl);
+    },
+  );
+  return {
+    ...query,
+    data: query.data || { datasetEvents: [], totalEntries: 0 },
+  };
+}
diff --git a/airflow/www/static/js/components/Table/Cells.tsx b/airflow/www/static/js/components/Table/Cells.tsx
new file mode 100644
index 0000000000..0c06d9f7dc
--- /dev/null
+++ b/airflow/www/static/js/components/Table/Cells.tsx
@@ -0,0 +1,61 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from 'react';
+import {
+  Code, Link, Box, Text,
+} from '@chakra-ui/react';
+
+import Time from 'src/components/Time';
+
+interface CellProps {
+  cell: {
+    value: any;
+    row: {
+      original: Record<string, any>;
+    }
+  }
+}
+
+export const TimeCell = ({ cell: { value } }: CellProps) => <Time dateTime={value} />;
+
+export const DatasetLink = ({ cell: { value, row } }: CellProps) => (
+  <Link
+    color="blue.600"
+    href={`/datasets?dataset_id=${row.original.datasetId}`}
+  >
+    {value}
+  </Link>
+);
+
+export const TaskInstanceLink = ({ cell: { value, row } }: CellProps) => {
+  const { sourceRunId, sourceDagId, sourceMapIndex } = row.original;
+  const url = `/dags/${sourceDagId}/grid?dag_run_id=${encodeURIComponent(sourceRunId)}&task_id=${encodeURIComponent(value)}`;
+  const mapIndex = sourceMapIndex > -1 ? `[${sourceMapIndex}]` : '';
+  return (
+    <Box>
+      <Link color="blue.600" href={url}>{`${sourceDagId}.${value}${mapIndex}`}</Link>
+      <Text>{sourceRunId}</Text>
+    </Box>
+  );
+};
+
+export const CodeCell = ({ cell: { value } }: CellProps) => (
+  value ? <Code>{JSON.stringify(value)}</Code> : null
+);
diff --git a/airflow/www/static/js/components/Table.test.tsx b/airflow/www/static/js/components/Table/Table.test.tsx
similarity index 99%
rename from airflow/www/static/js/components/Table.test.tsx
rename to airflow/www/static/js/components/Table/Table.test.tsx
index e7a2f14a9b..79f713ba9f 100644
--- a/airflow/www/static/js/components/Table.test.tsx
+++ b/airflow/www/static/js/components/Table/Table.test.tsx
@@ -25,8 +25,8 @@ import { render, fireEvent, within } from '@testing-library/react';
 import { sortBy } from 'lodash';
 import type { SortingRule } from 'react-table';
 
-import Table from './Table';
-import { ChakraWrapper } from '../utils/testUtils';
+import { ChakraWrapper } from 'src/utils/testUtils';
+import { Table } from '.';
 
 const data: Record<string, any>[] = [
   { firstName: 'Lamont', lastName: 'Grimes', country: 'United States' },
diff --git a/airflow/www/static/js/components/Table.tsx b/airflow/www/static/js/components/Table/index.tsx
similarity index 99%
rename from airflow/www/static/js/components/Table.tsx
rename to airflow/www/static/js/components/Table/index.tsx
index b539ed165c..534cc50f61 100644
--- a/airflow/www/static/js/components/Table.tsx
+++ b/airflow/www/static/js/components/Table/index.tsx
@@ -95,7 +95,7 @@ interface TableProps {
   onRowClicked?: (row: Row<object>, e: any) => void;
 }
 
-const Table = ({
+export const Table = ({
   data,
   columns,
   manualPagination,
@@ -272,4 +272,4 @@ const Table = ({
   );
 };
 
-export default Table;
+export * from './Cells';
diff --git a/airflow/www/static/js/dag/details/Dag.tsx b/airflow/www/static/js/dag/details/Dag.tsx
index b7c4fbcf44..dc227c4bd3 100644
--- a/airflow/www/static/js/dag/details/Dag.tsx
+++ b/airflow/www/static/js/dag/details/Dag.tsx
@@ -46,7 +46,7 @@ const Dag = () => {
   const { data: { dagRuns } } = useGridData();
 
   // Build a key/value object of operator counts, the name is hidden inside of t.classRef.className
-  const operators: Record<string, any> = {};
+  const operators: Record<string, number> = {};
   tasks.forEach((t) => {
     if (t?.classRef?.className) {
       if (!operators[t.classRef.className]) {
diff --git a/airflow/www/static/js/dag/details/dagRun/UpstreamEvents.tsx b/airflow/www/static/js/dag/details/dagRun/UpstreamEvents.tsx
new file mode 100644
index 0000000000..936f5e945c
--- /dev/null
+++ b/airflow/www/static/js/dag/details/dagRun/UpstreamEvents.tsx
@@ -0,0 +1,82 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import React, { useMemo } from 'react';
+import {
+  Box, Heading, Text,
+} from '@chakra-ui/react';
+
+import {
+  CodeCell, DatasetLink, Table, TaskInstanceLink, TimeCell,
+} from 'src/components/Table';
+import { useUpstreamDatasetEvents } from 'src/api';
+import type { DagRun as DagRunType } from 'src/types';
+
+interface Props {
+  runId: DagRunType['runId'];
+}
+
+const UpstreamEvents = ({ runId }: Props) => {
+  const { data: { datasetEvents }, isLoading } = useUpstreamDatasetEvents({ runId });
+
+  const columns = useMemo(
+    () => [
+      {
+        Header: 'Dataset URI',
+        accessor: 'datasetUri',
+        Cell: DatasetLink,
+      },
+      {
+        Header: 'Source Task Instance',
+        accessor: 'sourceTaskId',
+        Cell: TaskInstanceLink,
+      },
+      {
+        Header: 'Extra',
+        accessor: 'extra',
+        disableSortBy: true,
+        Cell: CodeCell,
+      },
+      {
+        Header: 'When',
+        accessor: 'timestamp',
+        Cell: TimeCell,
+      },
+    ],
+    [],
+  );
+
+  const data = useMemo(
+    () => datasetEvents,
+    [datasetEvents],
+  );
+
+  return (
+    <Box mt={3}>
+      <Heading size="md">Upstream Dataset Events</Heading>
+      <Text>Updates to the upstream datasets since the last dataset-triggered DAG run</Text>
+      <Table
+        data={data}
+        columns={columns}
+        isLoading={isLoading}
+      />
+    </Box>
+  );
+};
+
+export default UpstreamEvents;
diff --git a/airflow/www/static/js/dag/details/dagRun/index.tsx b/airflow/www/static/js/dag/details/dagRun/index.tsx
index 65e1a76a63..98b56aeca4 100644
--- a/airflow/www/static/js/dag/details/dagRun/index.tsx
+++ b/airflow/www/static/js/dag/details/dagRun/index.tsx
@@ -45,6 +45,7 @@ import MarkFailedRun from './MarkFailedRun';
 import MarkSuccessRun from './MarkSuccessRun';
 import QueueRun from './QueueRun';
 import ClearRun from './ClearRun';
+import UpstreamEvents from './UpstreamEvents';
 
 const dagId = getMetaValue('dag_id');
 const graphUrl = getMetaValue('graph_url');
@@ -169,6 +170,9 @@ const DagRun = ({ runId }: Props) => {
           )}
         </Tbody>
       </Table>
+      {runType === 'dataset_triggered' && (
+        <UpstreamEvents runId={runId} />
+      )}
     </>
   );
 };
diff --git a/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx b/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx
index 288e757521..8b45a4be2c 100644
--- a/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/MappedInstances.tsx
@@ -36,7 +36,7 @@ import { getMetaValue } from 'src/utils';
 import { formatDuration, getDuration } from 'src/datetime_utils';
 import { useMappedInstances } from 'src/api';
 import { SimpleStatus } from 'src/dag/StatusBox';
-import Table from 'src/components/Table';
+import { Table } from 'src/components/Table';
 import Time from 'src/components/Time';
 
 const canEdit = getMetaValue('can_edit') === 'True';
diff --git a/airflow/www/static/js/datasets/Details.tsx b/airflow/www/static/js/datasets/Details.tsx
index a99efdd42f..7cc16c101c 100644
--- a/airflow/www/static/js/datasets/Details.tsx
+++ b/airflow/www/static/js/datasets/Details.tsx
@@ -19,14 +19,16 @@
 
 import React, { useMemo, useState } from 'react';
 import {
-  Box, Heading, Text, Code, Flex, Spinner, Button, Link,
+  Box, Heading, Text, Code, Flex, Spinner, Button,
 } from '@chakra-ui/react';
 import { snakeCase } from 'lodash';
 import type { SortingRule } from 'react-table';
 
 import Time from 'src/components/Time';
 import { useDatasetEvents, useDataset } from 'src/api';
-import Table from 'src/components/Table';
+import {
+  Table, TimeCell, CodeCell, TaskInstanceLink,
+} from 'src/components/Table';
 import { ClipboardButton } from 'src/components/Clipboard';
 
 interface Props {
@@ -34,24 +36,10 @@ interface Props {
   onBack: () => void;
 }
 
-const TimeCell = ({ cell: { value } }: any) => <Time dateTime={value} />;
-const GridLink = ({ cell: { value } }: any) => <Link color="blue.500" href={`/dags/${value}/grid`}>{value}</Link>;
-const RunLink = ({ cell: { value, row } }: any) => {
-  const { sourceDagId } = row.original;
-  const url = `/dags/${sourceDagId}/grid?dag_run_id=${encodeURIComponent(value)}`;
-  return (<Link color="blue.500" href={url}>{value}</Link>);
-};
-const TaskInstanceLink = ({ cell: { value, row } }: any) => {
-  const { sourceRunId, sourceDagId } = row.original;
-  const url = `/dags/${sourceDagId}/grid?dag_run_id=${encodeURIComponent(sourceRunId)}&task_id=${encodeURIComponent(value)}`;
-  return (<Link color="blue.500" href={url}>{value}</Link>);
-};
-const CodeCell = ({ cell: { value } }: any) => <Code>{value}</Code>;
-
 const DatasetDetails = ({ datasetId, onBack }: Props) => {
   const limit = 25;
   const [offset, setOffset] = useState(0);
-  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 'createdAt', desc: true }]);
+  const [sortBy, setSortBy] = useState<SortingRule<object>[]>([{ id: 'timestamp', desc: true }]);
 
   const sort = sortBy[0];
   const order = sort ? `${sort.desc ? '-' : ''}${snakeCase(sort.id)}` : '';
@@ -67,36 +55,21 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
   const columns = useMemo(
     () => [
       {
-        Header: 'Timestamp',
-        accessor: 'timestamp',
-        Cell: TimeCell,
-      },
-      {
-        Header: 'Source DAG Id',
-        accessor: 'sourceDagId',
-        Cell: GridLink,
-      },
-      {
-        Header: 'Source DAG Run Id',
-        accessor: 'sourceRunId',
-        Cell: RunLink,
-      },
-      {
-        Header: 'Source Task Id',
+        Header: 'Source Task Instance',
         accessor: 'sourceTaskId',
         Cell: TaskInstanceLink,
       },
-      {
-        Header: 'Source Map Index',
-        accessor: 'sourceMapIndex',
-        Cell: ({ cell: { value } }) => (value > -1 ? value : null),
-      },
       {
         Header: 'Extra',
         accessor: 'extra',
         disableSortBy: true,
         Cell: CodeCell,
       },
+      {
+        Header: 'When',
+        accessor: 'timestamp',
+        Cell: TimeCell,
+      },
     ],
     [],
   );
@@ -109,35 +82,33 @@ const DatasetDetails = ({ datasetId, onBack }: Props) => {
   const memoSort = useMemo(() => sortBy, [sortBy]);
 
   return (
-    <Box maxWidth="1500px">
-      <Flex mt={3} justifyContent="space-between">
-        {isLoading && <Spinner display="block" />}
-        {!!dataset && (
-          <Box>
-            <Heading mb={2} fontWeight="normal">
-              Dataset:
-              {' '}
-              {dataset.uri}
-              <ClipboardButton value={dataset.uri} iconOnly ml={2} />
-            </Heading>
-            {!!dataset.extra && (
-              <Flex>
-                <Text mr={1}>Extra:</Text>
-                <Code>{JSON.stringify(dataset.extra)}</Code>
-              </Flex>
-            )}
-            <Flex my={2}>
-              <Text mr={1}>Updated At:</Text>
-              <Time dateTime={dataset.updatedAt} />
-            </Flex>
-            <Flex my={2}>
-              <Text mr={1}>Created At:</Text>
-              <Time dateTime={dataset.createdAt} />
+    <Box mt={[6, 3]} maxWidth="1500px">
+      <Button onClick={onBack}>See all datasets</Button>
+      {isLoading && <Spinner display="block" />}
+      {!!dataset && (
+        <Box>
+          <Heading my={2} fontWeight="normal">
+            Dataset:
+            {' '}
+            {dataset.uri}
+            <ClipboardButton value={dataset.uri} iconOnly ml={2} />
+          </Heading>
+          {!!dataset.extra && (
+            <Flex>
+              <Text mr={1}>Extra:</Text>
+              <Code>{JSON.stringify(dataset.extra)}</Code>
             </Flex>
-          </Box>
-        )}
-        <Button onClick={onBack}>See all datasets</Button>
-      </Flex>
+          )}
+          <Flex my={2}>
+            <Text mr={1}>Updated At:</Text>
+            <Time dateTime={dataset.updatedAt} />
+          </Flex>
+          <Flex my={2}>
+            <Text mr={1}>Created At:</Text>
+            <Time dateTime={dataset.createdAt} />
+          </Flex>
+        </Box>
+      )}
       <Heading size="lg" mt={3} mb={2} fontWeight="normal">Upstream Events</Heading>
       <Text>Whenever a DAG has updated this dataset.</Text>
       <Table
diff --git a/airflow/www/static/js/datasets/List.tsx b/airflow/www/static/js/datasets/List.tsx
index 99c7e2aabe..ee559bfa5a 100644
--- a/airflow/www/static/js/datasets/List.tsx
+++ b/airflow/www/static/js/datasets/List.tsx
@@ -20,24 +20,19 @@
 import React, { useMemo, useState } from 'react';
 import {
   Box,
-  Code,
   Heading,
 } from '@chakra-ui/react';
 import { snakeCase } from 'lodash';
 import type { Row, SortingRule } from 'react-table';
 
 import { useDatasets } from 'src/api';
-import Table from 'src/components/Table';
-import Time from 'src/components/Time';
+import { Table, TimeCell, CodeCell } from 'src/components/Table';
 import type { API } from 'src/types';
 
 interface Props {
   onSelect: (datasetId: string) => void;
 }
 
-const TimeCell = ({ cell: { value } }: any) => <Time dateTime={value} />;
-const CodeCell = ({ cell: { value } }: any) => <Code>{value}</Code>;
-
 const DatasetsList = ({ onSelect }: Props) => {
   const limit = 25;
   const [offset, setOffset] = useState(0);
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index a6f52d269b..bbbbde14e2 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -69,6 +69,7 @@
   <meta name="tasks_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_task_endpoint_get_tasks', dag_id=dag.dag_id) }}">
   <meta name="mapped_instances_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_task_instance_endpoint_get_mapped_task_instances', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_') }}">
   <meta name="task_log_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_log_endpoint_get_log', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_', task_id='_TASK_ID_', task_try_number='-1') }}">
+  <meta name="upstream_dataset_events_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_get_upstream_dataset_events', dag_id=dag.dag_id, dag_run_id='_DAG_RUN_ID_') }}">
   <!-- End Urls -->
   <meta name="is_paused" content="{{ dag_is_paused }}">
   <meta name="csrf_token" content="{{ csrf_token() }}">