You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ry...@apache.org on 2021/03/30 00:32:34 UTC

[airflow] branch master updated: Add query mutations to new UI (#15068)

This is an automated email from the ASF dual-hosted git repository.

ryanahamilton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ca49b6  Add query mutations to new UI (#15068)
9ca49b6 is described below

commit 9ca49b69113bb2a1eaa0f8cec2b5f8598efc19ea
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Mon Mar 29 18:32:11 2021 -0600

    Add query mutations to new UI (#15068)
    
    * UI: add save and trigger dag mutations
    
    * testing, names and table header
    
    * use pipeline in url
    
    * linting
    
    * use humps.decamelize and duration var
    
    * missing toast durations
---
 airflow/ui/src/api/index.ts                        | 122 ++++++++++++++++++++-
 airflow/ui/src/components/TriggerRunModal.tsx      |  83 ++++++++++++++
 airflow/ui/src/interfaces/api.ts                   |   7 ++
 .../ui/src/{interfaces/api.ts => utils/memo.ts}    |  23 ++--
 airflow/ui/src/views/Pipelines/Row.tsx             | 115 +++++++++++++++++++
 airflow/ui/src/views/Pipelines/index.tsx           |  33 +-----
 airflow/ui/test/Pipelines.test.tsx                 |  43 +++++++-
 7 files changed, 379 insertions(+), 47 deletions(-)

diff --git a/airflow/ui/src/api/index.ts b/airflow/ui/src/api/index.ts
index 3bee358..a41ce8d 100644
--- a/airflow/ui/src/api/index.ts
+++ b/airflow/ui/src/api/index.ts
@@ -19,11 +19,19 @@
  */
 
 import axios, { AxiosResponse } from 'axios';
-import { useQuery, setLogger } from 'react-query';
+import {
+  useMutation, useQuery, useQueryClient, setLogger,
+} from 'react-query';
 import humps from 'humps';
+import { useToast } from '@chakra-ui/react';
 
 import type { Dag, DagRun, Version } from 'interfaces';
-import type { DagsResponse, DagRunsResponse, TaskInstancesResponse } from 'interfaces/api';
+import type {
+  DagsResponse,
+  DagRunsResponse,
+  TaskInstancesResponse,
+  TriggerRunRequest,
+} from 'interfaces/api';
 
 axios.defaults.baseURL = `${process.env.WEBSERVER_URL}/api/v1`;
 axios.interceptors.response.use(
@@ -39,6 +47,7 @@ setLogger({
   error: isTest ? () => {} : console.warn,
 });
 
+const toastDuration = 3000;
 const refetchInterval = isTest ? false : 1000;
 
 export function useDags() {
@@ -75,3 +84,112 @@ export function useVersion() {
     (): Promise<Version> => axios.get('/version'),
   );
 }
+
+export function useTriggerRun(dagId: Dag['dagId']) {
+  const queryClient = useQueryClient();
+  const toast = useToast();
+  return useMutation(
+    (trigger: TriggerRunRequest) => axios.post(`dags/${dagId}/dagRuns`, humps.decamelizeKeys(trigger)),
+    {
+      onSettled: (res, error) => {
+        if (error) {
+          toast({
+            title: 'Error triggering DAG',
+            description: (error as Error).message,
+            status: 'error',
+            duration: toastDuration,
+            isClosable: true,
+          });
+        } else {
+          toast({
+            title: 'DAG Triggered',
+            status: 'success',
+            duration: toastDuration,
+            isClosable: true,
+          });
+          const dagRunData = queryClient.getQueryData(['dagRun', dagId]) as unknown as DagRunsResponse;
+          if (dagRunData) {
+            queryClient.setQueryData(['dagRun', dagId], {
+              dagRuns: [...dagRunData.dagRuns, res],
+              totalEntries: dagRunData.totalEntries += 1,
+            });
+          } else {
+            queryClient.setQueryData(['dagRun', dagId], {
+              dagRuns: [res],
+              totalEntries: 1,
+            });
+          }
+        }
+        queryClient.invalidateQueries(['dagRun', dagId]);
+      },
+    },
+  );
+}
+
+export function useSaveDag(dagId: Dag['dagId']) {
+  const queryClient = useQueryClient();
+  const toast = useToast();
+  return useMutation(
+    (updatedValues: Record<string, any>) => axios.patch(`dags/${dagId}`, humps.decamelizeKeys(updatedValues)),
+    {
+      onMutate: async (updatedValues: Record<string, any>) => {
+        await queryClient.cancelQueries(['dag', dagId]);
+        const previousDag = queryClient.getQueryData(['dag', dagId]) as Dag;
+        const previousDags = queryClient.getQueryData('dags') as DagsResponse;
+
+        const newDags = previousDags.dags.map((dag) => (
+          dag.dagId === dagId ? { ...dag, ...updatedValues } : dag
+        ));
+        const newDag = {
+          ...previousDag,
+          ...updatedValues,
+        };
+
+        // optimistically set the dag before the async request
+        queryClient.setQueryData(['dag', dagId], () => newDag);
+        queryClient.setQueryData('dags', (old) => ({
+          ...(old as Dag[]),
+          ...{
+            dags: newDags,
+            totalEntries: previousDags.totalEntries,
+          },
+        }));
+        return { [dagId]: previousDag, dags: previousDags };
+      },
+      onSettled: (res, error, variables, context) => {
+        const previousDag = (context as any)[dagId] as Dag;
+        const previousDags = (context as any).dags as DagsResponse;
+        // rollback to previous cache on error
+        if (error) {
+          queryClient.setQueryData(['dag', dagId], previousDag);
+          queryClient.setQueryData('dags', previousDags);
+          toast({
+            title: 'Error updating pipeline',
+            description: (error as Error).message,
+            status: 'error',
+            duration: toastDuration,
+            isClosable: true,
+          });
+        } else {
+          // check if server response is different from our optimistic update
+          if (JSON.stringify(res) !== JSON.stringify(previousDag)) {
+            queryClient.setQueryData(['dag', dagId], res);
+            queryClient.setQueryData('dags', {
+              dags: previousDags.dags.map((dag) => (
+                dag.dagId === dagId ? res : dag
+              )),
+              totalEntries: previousDags.totalEntries,
+            });
+          }
+          toast({
+            title: 'Pipeline Updated',
+            status: 'success',
+            duration: toastDuration,
+            isClosable: true,
+          });
+        }
+        queryClient.invalidateQueries(['dag', dagId]);
+      },
+    },
+  );
+}
diff --git a/airflow/ui/src/components/TriggerRunModal.tsx b/airflow/ui/src/components/TriggerRunModal.tsx
new file mode 100644
index 0000000..c988c82
--- /dev/null
+++ b/airflow/ui/src/components/TriggerRunModal.tsx
@@ -0,0 +1,83 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React, { ChangeEvent, useState } from 'react';
+import {
+  Button,
+  FormControl,
+  FormLabel,
+  Modal,
+  ModalHeader,
+  ModalFooter,
+  ModalCloseButton,
+  ModalOverlay,
+  ModalContent,
+  ModalBody,
+  Textarea,
+} from '@chakra-ui/react';
+
+import type { Dag } from 'interfaces';
+import { useTriggerRun } from 'api';
+
+interface Props {
+  dagId: Dag['dagId'];
+  isOpen: boolean;
+  onClose: () => void;
+}
+
+const TriggerRunModal: React.FC<Props> = ({ dagId, isOpen, onClose }) => {
+  const mutation = useTriggerRun(dagId);
+  const [config, setConfig] = useState('{}');
+
+  const onTrigger = () => {
+    mutation.mutate({
+      conf: JSON.parse(config),
+      executionDate: new Date(),
+    });
+    onClose();
+  };
+
+  return (
+    <Modal size="lg" isOpen={isOpen} onClose={onClose}>
+      <ModalOverlay />
+      <ModalContent>
+        <ModalHeader>
+          Trigger Run:
+          {' '}
+          {dagId}
+        </ModalHeader>
+        <ModalCloseButton />
+        <ModalBody>
+          <FormControl>
+            <FormLabel htmlFor="configuration">Configuration JSON (Optional)</FormLabel>
+            <Textarea name="configuration" value={config} onChange={(e: ChangeEvent<HTMLTextAreaElement>) => setConfig(e.target.value)} />
+          </FormControl>
+        </ModalBody>
+        <ModalFooter>
+          <Button variant="ghost" onClick={onClose}>Cancel</Button>
+          <Button ml={2} onClick={onTrigger}>
+            Trigger
+          </Button>
+        </ModalFooter>
+      </ModalContent>
+    </Modal>
+  );
+};
+
+export default TriggerRunModal;
diff --git a/airflow/ui/src/interfaces/api.ts b/airflow/ui/src/interfaces/api.ts
index 6668372..855f2bc 100644
--- a/airflow/ui/src/interfaces/api.ts
+++ b/airflow/ui/src/interfaces/api.ts
@@ -34,3 +34,10 @@ export interface DagRunsResponse extends Entries {
 export interface TaskInstancesResponse extends Entries {
   taskInstances: TaskInstance[];
 }
+
+export interface TriggerRunRequest {
+  conf: Record<string, any>;
+  dagRunId?: string;
+  executionDate: Date;
+  state?: 'success' | 'running' | 'failed';
+}
diff --git a/airflow/ui/src/interfaces/api.ts b/airflow/ui/src/utils/memo.ts
similarity index 70%
copy from airflow/ui/src/interfaces/api.ts
copy to airflow/ui/src/utils/memo.ts
index 6668372..1257913 100644
--- a/airflow/ui/src/interfaces/api.ts
+++ b/airflow/ui/src/utils/memo.ts
@@ -17,20 +17,13 @@
  * under the License.
  */
 
-import type { Dag, DagRun, TaskInstance } from './index';
+import type { PropsWithChildren } from 'react';
 
-interface Entries {
-  totalEntries: number;
-}
+const compareObjectProps = (
+  prevProps: PropsWithChildren<any>,
+  nextProps: PropsWithChildren<any>,
+) => (
+  JSON.stringify(prevProps) === JSON.stringify(nextProps)
+);
 
-export interface DagsResponse extends Entries {
-  dags: Dag[];
-}
-
-export interface DagRunsResponse extends Entries {
-  dagRuns: DagRun[];
-}
-
-export interface TaskInstancesResponse extends Entries {
-  taskInstances: TaskInstance[];
-}
+export default compareObjectProps;
diff --git a/airflow/ui/src/views/Pipelines/Row.tsx b/airflow/ui/src/views/Pipelines/Row.tsx
new file mode 100644
index 0000000..deff408
--- /dev/null
+++ b/airflow/ui/src/views/Pipelines/Row.tsx
@@ -0,0 +1,115 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from 'react';
+import { Link as RouterLink } from 'react-router-dom';
+import {
+  Flex,
+  Link,
+  Tr,
+  Td,
+  Tag,
+  Tooltip,
+  useColorModeValue,
+  Switch,
+  useDisclosure,
+  IconButton,
+} from '@chakra-ui/react';
+
+import TriggerRunModal from 'components/TriggerRunModal';
+import compareObjectProps from 'utils/memo';
+import type { Dag, DagTag } from 'interfaces';
+import { useSaveDag } from 'api';
+import { MdPlayArrow } from 'react-icons/md';
+
+interface Props {
+  dag: Dag;
+}
+
+const Row: React.FC<Props> = ({ dag }) => {
+  const { isOpen, onToggle, onClose } = useDisclosure();
+  const mutation = useSaveDag(dag.dagId);
+  const togglePaused = () => mutation.mutate({ isPaused: !dag.isPaused });
+
+  const oddColor = useColorModeValue('gray.50', 'gray.900');
+  const hoverColor = useColorModeValue('gray.100', 'gray.700');
+
+  return (
+    <Tr
+      _odd={{ backgroundColor: oddColor }}
+      _hover={{ backgroundColor: hoverColor }}
+    >
+      <Td onClick={(e) => e.stopPropagation()} paddingRight="0" width="58px">
+        <Tooltip
+          label={dag.isPaused ? 'Activate DAG' : 'Pause DAG'}
+          aria-label={dag.isPaused ? 'Activate DAG' : 'Pause DAG'}
+          hasArrow
+        >
+          {/* span helps tooltip find its position */}
+          <span>
+            <Switch
+              role="switch"
+              isChecked={!dag.isPaused}
+              onChange={togglePaused}
+            />
+          </span>
+        </Tooltip>
+      </Td>
+      <Td>
+        <Flex alignItems="center">
+          <Link
+            as={RouterLink}
+            to={`/pipelines/${dag.dagId}`}
+            fontWeight="bold"
+          >
+            {dag.dagId}
+          </Link>
+          {dag.tags.map((tag: DagTag) => (
+            <Tag
+              size="sm"
+              mt="1"
+              ml="1"
+              mb="1"
+              key={tag.name}
+            >
+              {tag.name}
+            </Tag>
+          ))}
+        </Flex>
+      </Td>
+      <Td textAlign="right">
+        <Tooltip
+          label="Trigger DAG"
+          aria-label="Trigger DAG"
+          hasArrow
+        >
+          <IconButton
+            size="sm"
+            aria-label="Trigger Dag"
+            icon={<MdPlayArrow />}
+            onClick={onToggle}
+          />
+        </Tooltip>
+        <TriggerRunModal dagId={dag.dagId} isOpen={isOpen} onClose={onClose} />
+      </Td>
+    </Tr>
+  );
+};
+
+export default React.memo(Row, compareObjectProps);
diff --git a/airflow/ui/src/views/Pipelines/index.tsx b/airflow/ui/src/views/Pipelines/index.tsx
index 804ae0f..c5fbcdc 100644
--- a/airflow/ui/src/views/Pipelines/index.tsx
+++ b/airflow/ui/src/views/Pipelines/index.tsx
@@ -18,30 +18,25 @@
  */
 
 import React from 'react';
-import { Link as RouterLink } from 'react-router-dom';
 import {
-  Link,
   Alert,
   AlertIcon,
-  Flex,
   Table,
   Thead,
   Tbody,
   Tr,
   Th,
   Td,
-  useColorModeValue,
 } from '@chakra-ui/react';
 
 import AppContainer from 'components/AppContainer';
 import { defaultDags } from 'api/defaults';
 import { useDags } from 'api';
 import type { Dag } from 'interfaces';
+import Row from './Row';
 
 const Pipelines: React.FC = () => {
   const { data: { dags } = defaultDags, isLoading, error } = useDags();
-  const oddColor = useColorModeValue('gray.50', 'gray.900');
-  const evenColor = useColorModeValue('gray.100', 'gray.700');
 
   return (
     <AppContainer>
@@ -57,7 +52,9 @@ const Pipelines: React.FC = () => {
             borderBottomWidth="1px"
             textAlign="left"
           >
+            <Th />
             <Th>DAG ID</Th>
+            <Th />
           </Tr>
         </Thead>
         <Tbody>
@@ -71,29 +68,7 @@ const Pipelines: React.FC = () => {
             <Td colSpan={2}>No Pipelines found.</Td>
           </Tr>
           )}
-          {dags.map((dag: Dag) => (
-            <Tr
-              key={dag.dagId}
-              _odd={{
-                backgroundColor: oddColor,
-              }}
-              _hover={{
-                backgroundColor: evenColor,
-              }}
-            >
-              <Td>
-                <Flex alignItems="center">
-                  <Link
-                    as={RouterLink}
-                    to={`/pipelines/${dag.dagId}`}
-                    fontWeight="bold"
-                  >
-                    {dag.dagId}
-                  </Link>
-                </Flex>
-              </Td>
-            </Tr>
-          ))}
+          {dags.map((dag: Dag) => <Row key={dag.dagId} dag={dag} />)}
         </Tbody>
       </Table>
     </AppContainer>
diff --git a/airflow/ui/test/Pipelines.test.tsx b/airflow/ui/test/Pipelines.test.tsx
index b40f360..a5015d4 100644
--- a/airflow/ui/test/Pipelines.test.tsx
+++ b/airflow/ui/test/Pipelines.test.tsx
@@ -19,14 +19,17 @@
 
 import React from 'react';
 import '@testing-library/jest-dom';
-import { render, waitFor } from '@testing-library/react';
+import { render, waitFor, fireEvent } from '@testing-library/react';
 import nock from 'nock';
+import axios from 'axios';
 
 import Pipelines from 'views/Pipelines';
 import {
   defaultHeaders, QueryWrapper, RouterWrapper, url,
 } from './utils';
 
+axios.defaults.adapter = require('axios/lib/adapters/http');
+
 const sampleDag = {
   dagId: 'dagId1',
   description: 'string',
@@ -86,12 +89,50 @@ describe('Test Pipelines Table', () => {
         totalEntries: 0,
       });
 
+    nock(url)
+      .defaultReplyHeaders(defaultHeaders)
+      .persist()
+      .intercept(`/dags/${sampleDag.dagId}`, 'PATCH')
+      .reply(200, { ...sampleDag, ...{ isPaused: !sampleDag.isPaused } });
+
     const { getByText } = render(
       <QueryWrapper><Pipelines /></QueryWrapper>,
       {
         wrapper: RouterWrapper,
       },
     );
+
     await waitFor(() => expect(getByText('No Pipelines found.')).toBeInTheDocument());
   });
+
+  test('Toggle a pipeline on/off', async () => {
+    nock(url)
+      .defaultReplyHeaders(defaultHeaders)
+      .get('/dags')
+      .reply(200, {
+        dags: [sampleDag],
+        totalEntries: 1,
+      });
+
+    nock(url)
+      .defaultReplyHeaders(defaultHeaders)
+      .persist()
+      .intercept(`/dags/${sampleDag.dagId}`, 'PATCH')
+      .reply(200, { ...sampleDag, ...{ isPaused: !sampleDag.isPaused } });
+
+    const { getByText, getByRole } = render(
+      <QueryWrapper><Pipelines /></QueryWrapper>,
+      {
+        wrapper: RouterWrapper,
+      },
+    );
+
+    await waitFor(() => expect(getByText(sampleDag.dagId)).toBeInTheDocument());
+    const toggle = getByRole('switch');
+    expect(toggle.firstChild?.checked).toBeTruthy();
+    fireEvent.click(toggle);
+    // 'Dag Updated' is the toast confirming the change happened
+    await waitFor(() => expect(getByText('Pipeline Updated')).toBeInTheDocument());
+    await waitFor(() => expect(toggle.firstChild?.checked).toBeFalsy());
+  });
 });