You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ry...@apache.org on 2021/03/30 00:32:34 UTC
[airflow] branch master updated: Add query mutations to new UI
(#15068)
This is an automated email from the ASF dual-hosted git repository.
ryanahamilton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 9ca49b6 Add query mutations to new UI (#15068)
9ca49b6 is described below
commit 9ca49b69113bb2a1eaa0f8cec2b5f8598efc19ea
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Mon Mar 29 18:32:11 2021 -0600
Add query mutations to new UI (#15068)
* UI: add save and trigger dag mutations
* testing, names and table header
* use pipeline in url
* linting
* use humps.decamelize and duration var
* missing toast durations
---
airflow/ui/src/api/index.ts | 122 ++++++++++++++++++++-
airflow/ui/src/components/TriggerRunModal.tsx | 83 ++++++++++++++
airflow/ui/src/interfaces/api.ts | 7 ++
.../ui/src/{interfaces/api.ts => utils/memo.ts} | 23 ++--
airflow/ui/src/views/Pipelines/Row.tsx | 115 +++++++++++++++++++
airflow/ui/src/views/Pipelines/index.tsx | 33 +-----
airflow/ui/test/Pipelines.test.tsx | 43 +++++++-
7 files changed, 379 insertions(+), 47 deletions(-)
diff --git a/airflow/ui/src/api/index.ts b/airflow/ui/src/api/index.ts
index 3bee358..a41ce8d 100644
--- a/airflow/ui/src/api/index.ts
+++ b/airflow/ui/src/api/index.ts
@@ -19,11 +19,19 @@
*/
import axios, { AxiosResponse } from 'axios';
-import { useQuery, setLogger } from 'react-query';
+import {
+ useMutation, useQuery, useQueryClient, setLogger,
+} from 'react-query';
import humps from 'humps';
+import { useToast } from '@chakra-ui/react';
import type { Dag, DagRun, Version } from 'interfaces';
-import type { DagsResponse, DagRunsResponse, TaskInstancesResponse } from 'interfaces/api';
+import type {
+ DagsResponse,
+ DagRunsResponse,
+ TaskInstancesResponse,
+ TriggerRunRequest,
+} from 'interfaces/api';
axios.defaults.baseURL = `${process.env.WEBSERVER_URL}/api/v1`;
axios.interceptors.response.use(
@@ -39,6 +47,7 @@ setLogger({
error: isTest ? () => {} : console.warn,
});
+const toastDuration = 3000;
const refetchInterval = isTest ? false : 1000;
export function useDags() {
@@ -75,3 +84,112 @@ export function useVersion() {
(): Promise<Version> => axios.get('/version'),
);
}
+
+export function useTriggerRun(dagId: Dag['dagId']) {
+ const queryClient = useQueryClient();
+ const toast = useToast();
+ return useMutation(
+ (trigger: TriggerRunRequest) => axios.post(`dags/${dagId}/dagRuns`, humps.decamelizeKeys(trigger)),
+ {
+ onSettled: (res, error) => {
+ if (error) {
+ toast({
+ title: 'Error triggering DAG',
+ description: (error as Error).message,
+ status: 'error',
+ duration: toastDuration,
+ isClosable: true,
+ });
+ } else {
+ toast({
+ title: 'DAG Triggered',
+ status: 'success',
+ duration: toastDuration,
+ isClosable: true,
+ });
+ const dagRunData = queryClient.getQueryData(['dagRun', dagId]) as unknown as DagRunsResponse;
+ if (dagRunData) {
+ queryClient.setQueryData(['dagRun', dagId], {
+ dagRuns: [...dagRunData.dagRuns, res],
+ totalEntries: dagRunData.totalEntries += 1,
+ });
+ } else {
+ queryClient.setQueryData(['dagRun', dagId], {
+ dagRuns: [res],
+ totalEntries: 1,
+ });
+ }
+ }
+ queryClient.invalidateQueries(['dagRun', dagId]);
+ },
+ },
+ );
+}
+
+export function useSaveDag(dagId: Dag['dagId']) {
+ const queryClient = useQueryClient();
+ const toast = useToast();
+ return useMutation(
+ (updatedValues: Record<string, any>) => axios.patch(`dags/${dagId}`, humps.decamelizeKeys(updatedValues)),
+ {
+ onMutate: async (updatedValues: Record<string, any>) => {
+ await queryClient.cancelQueries(['dag', dagId]);
+ const previousDag = queryClient.getQueryData(['dag', dagId]) as Dag;
+ const previousDags = queryClient.getQueryData('dags') as DagsResponse;
+
+ const newDags = previousDags.dags.map((dag) => (
+ dag.dagId === dagId ? { ...dag, ...updatedValues } : dag
+ ));
+ const newDag = {
+ ...previousDag,
+ ...updatedValues,
+ };
+
+ // optimistically set the dag before the async request
+ queryClient.setQueryData(['dag', dagId], () => newDag);
+ queryClient.setQueryData('dags', (old) => ({
+ ...(old as Dag[]),
+ ...{
+ dags: newDags,
+ totalEntries: previousDags.totalEntries,
+ },
+ }));
+ return { [dagId]: previousDag, dags: previousDags };
+ },
+ onSettled: (res, error, variables, context) => {
+ const previousDag = (context as any)[dagId] as Dag;
+ const previousDags = (context as any).dags as DagsResponse;
+ // rollback to previous cache on error
+ if (error) {
+ queryClient.setQueryData(['dag', dagId], previousDag);
+ queryClient.setQueryData('dags', previousDags);
+ toast({
+ title: 'Error updating pipeline',
+ description: (error as Error).message,
+ status: 'error',
+ duration: toastDuration,
+ isClosable: true,
+ });
+ } else {
+ // check if server response is different from our optimistic update
+ if (JSON.stringify(res) !== JSON.stringify(previousDag)) {
+ queryClient.setQueryData(['dag', dagId], res);
+ queryClient.setQueryData('dags', {
+ dags: previousDags.dags.map((dag) => (
+ dag.dagId === dagId ? res : dag
+ )),
+ totalEntries: previousDags.totalEntries,
+ });
+ }
+ toast({
+ title: 'Pipeline Updated',
+ status: 'success',
+ duration: toastDuration,
+ isClosable: true,
+ });
+ }
+ queryClient.invalidateQueries(['dag', dagId]);
+ },
+ },
+ );
+}
diff --git a/airflow/ui/src/components/TriggerRunModal.tsx b/airflow/ui/src/components/TriggerRunModal.tsx
new file mode 100644
index 0000000..c988c82
--- /dev/null
+++ b/airflow/ui/src/components/TriggerRunModal.tsx
@@ -0,0 +1,83 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React, { ChangeEvent, useState } from 'react';
+import {
+ Button,
+ FormControl,
+ FormLabel,
+ Modal,
+ ModalHeader,
+ ModalFooter,
+ ModalCloseButton,
+ ModalOverlay,
+ ModalContent,
+ ModalBody,
+ Textarea,
+} from '@chakra-ui/react';
+
+import type { Dag } from 'interfaces';
+import { useTriggerRun } from 'api';
+
+interface Props {
+ dagId: Dag['dagId'];
+ isOpen: boolean;
+ onClose: () => void;
+}
+
+const TriggerRunModal: React.FC<Props> = ({ dagId, isOpen, onClose }) => {
+ const mutation = useTriggerRun(dagId);
+ const [config, setConfig] = useState('{}');
+
+ const onTrigger = () => {
+ mutation.mutate({
+ conf: JSON.parse(config),
+ executionDate: new Date(),
+ });
+ onClose();
+ };
+
+ return (
+ <Modal size="lg" isOpen={isOpen} onClose={onClose}>
+ <ModalOverlay />
+ <ModalContent>
+ <ModalHeader>
+ Trigger Run:
+ {' '}
+ {dagId}
+ </ModalHeader>
+ <ModalCloseButton />
+ <ModalBody>
+ <FormControl>
+ <FormLabel htmlFor="configuration">Configuration JSON (Optional)</FormLabel>
+ <Textarea name="configuration" value={config} onChange={(e: ChangeEvent<HTMLTextAreaElement>) => setConfig(e.target.value)} />
+ </FormControl>
+ </ModalBody>
+ <ModalFooter>
+ <Button variant="ghost" onClick={onClose}>Cancel</Button>
+ <Button ml={2} onClick={onTrigger}>
+ Trigger
+ </Button>
+ </ModalFooter>
+ </ModalContent>
+ </Modal>
+ );
+};
+
+export default TriggerRunModal;
diff --git a/airflow/ui/src/interfaces/api.ts b/airflow/ui/src/interfaces/api.ts
index 6668372..855f2bc 100644
--- a/airflow/ui/src/interfaces/api.ts
+++ b/airflow/ui/src/interfaces/api.ts
@@ -34,3 +34,10 @@ export interface DagRunsResponse extends Entries {
export interface TaskInstancesResponse extends Entries {
taskInstances: TaskInstance[];
}
+
+export interface TriggerRunRequest {
+ conf: Record<string, any>;
+ dagRunId?: string;
+ executionDate: Date;
+ state?: 'success' | 'running' | 'failed';
+}
diff --git a/airflow/ui/src/interfaces/api.ts b/airflow/ui/src/utils/memo.ts
similarity index 70%
copy from airflow/ui/src/interfaces/api.ts
copy to airflow/ui/src/utils/memo.ts
index 6668372..1257913 100644
--- a/airflow/ui/src/interfaces/api.ts
+++ b/airflow/ui/src/utils/memo.ts
@@ -17,20 +17,13 @@
* under the License.
*/
-import type { Dag, DagRun, TaskInstance } from './index';
+import type { PropsWithChildren } from 'react';
-interface Entries {
- totalEntries: number;
-}
+const compareObjectProps = (
+ prevProps: PropsWithChildren<any>,
+ nextProps: PropsWithChildren<any>,
+) => (
+ JSON.stringify(prevProps) === JSON.stringify(nextProps)
+);
-export interface DagsResponse extends Entries {
- dags: Dag[];
-}
-
-export interface DagRunsResponse extends Entries {
- dagRuns: DagRun[];
-}
-
-export interface TaskInstancesResponse extends Entries {
- taskInstances: TaskInstance[];
-}
+export default compareObjectProps;
diff --git a/airflow/ui/src/views/Pipelines/Row.tsx b/airflow/ui/src/views/Pipelines/Row.tsx
new file mode 100644
index 0000000..deff408
--- /dev/null
+++ b/airflow/ui/src/views/Pipelines/Row.tsx
@@ -0,0 +1,115 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from 'react';
+import { Link as RouterLink } from 'react-router-dom';
+import {
+ Flex,
+ Link,
+ Tr,
+ Td,
+ Tag,
+ Tooltip,
+ useColorModeValue,
+ Switch,
+ useDisclosure,
+ IconButton,
+} from '@chakra-ui/react';
+
+import TriggerRunModal from 'components/TriggerRunModal';
+import compareObjectProps from 'utils/memo';
+import type { Dag, DagTag } from 'interfaces';
+import { useSaveDag } from 'api';
+import { MdPlayArrow } from 'react-icons/md';
+
+interface Props {
+ dag: Dag;
+}
+
+const Row: React.FC<Props> = ({ dag }) => {
+ const { isOpen, onToggle, onClose } = useDisclosure();
+ const mutation = useSaveDag(dag.dagId);
+ const togglePaused = () => mutation.mutate({ isPaused: !dag.isPaused });
+
+ const oddColor = useColorModeValue('gray.50', 'gray.900');
+ const hoverColor = useColorModeValue('gray.100', 'gray.700');
+
+ return (
+ <Tr
+ _odd={{ backgroundColor: oddColor }}
+ _hover={{ backgroundColor: hoverColor }}
+ >
+ <Td onClick={(e) => e.stopPropagation()} paddingRight="0" width="58px">
+ <Tooltip
+ label={dag.isPaused ? 'Activate DAG' : 'Pause DAG'}
+ aria-label={dag.isPaused ? 'Activate DAG' : 'Pause DAG'}
+ hasArrow
+ >
+ {/* span helps tooltip find its position */}
+ <span>
+ <Switch
+ role="switch"
+ isChecked={!dag.isPaused}
+ onChange={togglePaused}
+ />
+ </span>
+ </Tooltip>
+ </Td>
+ <Td>
+ <Flex alignItems="center">
+ <Link
+ as={RouterLink}
+ to={`/pipelines/${dag.dagId}`}
+ fontWeight="bold"
+ >
+ {dag.dagId}
+ </Link>
+ {dag.tags.map((tag: DagTag) => (
+ <Tag
+ size="sm"
+ mt="1"
+ ml="1"
+ mb="1"
+ key={tag.name}
+ >
+ {tag.name}
+ </Tag>
+ ))}
+ </Flex>
+ </Td>
+ <Td textAlign="right">
+ <Tooltip
+ label="Trigger DAG"
+ aria-label="Trigger DAG"
+ hasArrow
+ >
+ <IconButton
+ size="sm"
+ aria-label="Trigger Dag"
+ icon={<MdPlayArrow />}
+ onClick={onToggle}
+ />
+ </Tooltip>
+ <TriggerRunModal dagId={dag.dagId} isOpen={isOpen} onClose={onClose} />
+ </Td>
+ </Tr>
+ );
+};
+
+export default React.memo(Row, compareObjectProps);
diff --git a/airflow/ui/src/views/Pipelines/index.tsx b/airflow/ui/src/views/Pipelines/index.tsx
index 804ae0f..c5fbcdc 100644
--- a/airflow/ui/src/views/Pipelines/index.tsx
+++ b/airflow/ui/src/views/Pipelines/index.tsx
@@ -18,30 +18,25 @@
*/
import React from 'react';
-import { Link as RouterLink } from 'react-router-dom';
import {
- Link,
Alert,
AlertIcon,
- Flex,
Table,
Thead,
Tbody,
Tr,
Th,
Td,
- useColorModeValue,
} from '@chakra-ui/react';
import AppContainer from 'components/AppContainer';
import { defaultDags } from 'api/defaults';
import { useDags } from 'api';
import type { Dag } from 'interfaces';
+import Row from './Row';
const Pipelines: React.FC = () => {
const { data: { dags } = defaultDags, isLoading, error } = useDags();
- const oddColor = useColorModeValue('gray.50', 'gray.900');
- const evenColor = useColorModeValue('gray.100', 'gray.700');
return (
<AppContainer>
@@ -57,7 +52,9 @@ const Pipelines: React.FC = () => {
borderBottomWidth="1px"
textAlign="left"
>
+ <Th />
<Th>DAG ID</Th>
+ <Th />
</Tr>
</Thead>
<Tbody>
@@ -71,29 +68,7 @@ const Pipelines: React.FC = () => {
<Td colSpan={2}>No Pipelines found.</Td>
</Tr>
)}
- {dags.map((dag: Dag) => (
- <Tr
- key={dag.dagId}
- _odd={{
- backgroundColor: oddColor,
- }}
- _hover={{
- backgroundColor: evenColor,
- }}
- >
- <Td>
- <Flex alignItems="center">
- <Link
- as={RouterLink}
- to={`/pipelines/${dag.dagId}`}
- fontWeight="bold"
- >
- {dag.dagId}
- </Link>
- </Flex>
- </Td>
- </Tr>
- ))}
+ {dags.map((dag: Dag) => <Row key={dag.dagId} dag={dag} />)}
</Tbody>
</Table>
</AppContainer>
diff --git a/airflow/ui/test/Pipelines.test.tsx b/airflow/ui/test/Pipelines.test.tsx
index b40f360..a5015d4 100644
--- a/airflow/ui/test/Pipelines.test.tsx
+++ b/airflow/ui/test/Pipelines.test.tsx
@@ -19,14 +19,17 @@
import React from 'react';
import '@testing-library/jest-dom';
-import { render, waitFor } from '@testing-library/react';
+import { render, waitFor, fireEvent } from '@testing-library/react';
import nock from 'nock';
+import axios from 'axios';
import Pipelines from 'views/Pipelines';
import {
defaultHeaders, QueryWrapper, RouterWrapper, url,
} from './utils';
+axios.defaults.adapter = require('axios/lib/adapters/http');
+
const sampleDag = {
dagId: 'dagId1',
description: 'string',
@@ -86,12 +89,50 @@ describe('Test Pipelines Table', () => {
totalEntries: 0,
});
+ nock(url)
+ .defaultReplyHeaders(defaultHeaders)
+ .persist()
+ .intercept(`/dags/${sampleDag.dagId}`, 'PATCH')
+ .reply(200, { ...sampleDag, ...{ isPaused: !sampleDag.isPaused } });
+
const { getByText } = render(
<QueryWrapper><Pipelines /></QueryWrapper>,
{
wrapper: RouterWrapper,
},
);
+
await waitFor(() => expect(getByText('No Pipelines found.')).toBeInTheDocument());
});
+
+ test('Toggle a pipeline on/off', async () => {
+ nock(url)
+ .defaultReplyHeaders(defaultHeaders)
+ .get('/dags')
+ .reply(200, {
+ dags: [sampleDag],
+ totalEntries: 1,
+ });
+
+ nock(url)
+ .defaultReplyHeaders(defaultHeaders)
+ .persist()
+ .intercept(`/dags/${sampleDag.dagId}`, 'PATCH')
+ .reply(200, { ...sampleDag, ...{ isPaused: !sampleDag.isPaused } });
+
+ const { getByText, getByRole } = render(
+ <QueryWrapper><Pipelines /></QueryWrapper>,
+ {
+ wrapper: RouterWrapper,
+ },
+ );
+
+ await waitFor(() => expect(getByText(sampleDag.dagId)).toBeInTheDocument());
+ const toggle = getByRole('switch');
+ expect(toggle.firstChild?.checked).toBeTruthy();
+ fireEvent.click(toggle);
+ // 'Dag Updated' is the toast confirming the change happened
+ await waitFor(() => expect(getByText('Pipeline Updated')).toBeInTheDocument());
+ await waitFor(() => expect(toggle.firstChild?.checked).toBeFalsy());
+ });
});