You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/12/27 13:52:25 UTC

[airflow-cancel-workflow-runs] 09/44: Add initial support for schedule, and refactor accordingly

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow-cancel-workflow-runs.git

commit 3f0b5bcf961834fa3d9214acd5305e7c181d5d30
Author: Jason T. Greene <ja...@redhat.com>
AuthorDate: Tue Feb 18 16:24:50 2020 -0600

    Add initial support for schedule, and refactor accordingly
---
 __tests__/main.test.ts |  21 ++++---
 dist/index.js          | 134 +++++++++++++++++++++++++++++---------------
 src/main.ts            | 148 ++++++++++++++++++++++++++++++++++++-------------
 3 files changed, 209 insertions(+), 94 deletions(-)

diff --git a/__tests__/main.test.ts b/__tests__/main.test.ts
index e237067..819d5d4 100644
--- a/__tests__/main.test.ts
+++ b/__tests__/main.test.ts
@@ -7,16 +7,19 @@ test('no op', () => {})
 // shows how the runner will run a javascript action with env / stdout protocol
 // test('test runs', () => {
 //   const ip = path.join(__dirname, '..', 'lib', 'main.js')
-//     process.env['INPUT_TOKEN'] = ''
-//     process.env['GITHUB_RUN_ID'] = '35588693' //'33782469'
-//     process.env['GITHUB_REPOSITORY'] = ''
-//     process.env['GITHUB_HEAD_REF'] = 'refs/heads/n1hility-patch-5'
-//     process.env['GITHUB_EVENT_NAME'] = 'pull_request'
+//   process.env['INPUT_TOKEN'] = ''
+//   //process.env['INPUT_WORKFLOW'] = 'ci-actions.yml'
+//   process.env['GITHUB_RUN_ID'] = '41374869' //'33782469'
+//   process.env['GITHUB_REPOSITORY'] = ''
+//   //process.env['GITHUB_HEAD_REF'] = 'refs/heads/n1hility-patch-5'
+//   process.env['GITHUB_REF'] = 'refs/heads/master'
+//   process.env['GITHUB_EVENT_NAME'] = 'push'
+//   // process.env['GITHUB_EVENT_NAME'] = 'schedule'
 
-// //   process.env['GITHUB_RUN_ID'] = '35599067'
-// //   process.env['GITHUB_REPOSITORY'] = ''
-// //   process.env['GITHUB_REF'] = 'refs/heads/master'
-// //   process.env['GITHUB_EVENT_NAME'] = 'push'
+//   //   process.env['GITHUB_RUN_ID'] = '35599067'
+//   //   process.env['GITHUB_REPOSITORY'] = ''
+//   //   process.env['GITHUB_REF'] = 'refs/heads/master'
+//   //   process.env['GITHUB_EVENT_NAME'] = 'push'
 
 //   const options: cp.ExecSyncOptions = {
 //     env: process.env
diff --git a/dist/index.js b/dist/index.js
index a5661b6..16c7215 100644
--- a/dist/index.js
+++ b/dist/index.js
@@ -1465,17 +1465,104 @@ var __importStar = (this && this.__importStar) || function (mod) {
 Object.defineProperty(exports, "__esModule", { value: true });
 const github = __importStar(__webpack_require__(469));
 const core = __importStar(__webpack_require__(393));
-function run() {
+function cancelDuplicates(token, selfRunId, owner, repo, workflowId, branch, event) {
     var e_1, _a;
     return __awaiter(this, void 0, void 0, function* () {
+        const octokit = new github.GitHub(token);
+        // Deteermind the workflow to reduce the result set, or reference anothre workflow
+        let resolvedId;
+        if (workflowId === undefined) {
+            const reply = yield octokit.actions.getWorkflowRun({
+                owner,
+                repo,
+                // eslint-disable-next-line @typescript-eslint/camelcase
+                run_id: Number.parseInt(selfRunId)
+            });
+            resolvedId = reply.data.workflow_url.split('/').pop();
+        }
+        else {
+            resolvedId = workflowId;
+        }
+        core.info(`Workflow ID is: ${resolvedId}`);
+        const request = branch === undefined
+            ? {
+                owner,
+                repo,
+                // eslint-disable-next-line @typescript-eslint/camelcase
+                workflow_id: resolvedId
+            }
+            : {
+                owner,
+                repo,
+                // eslint-disable-next-line @typescript-eslint/camelcase
+                workflow_id: resolvedId,
+                branch,
+                event
+            };
+        const listRuns = octokit.actions.listWorkflowRuns.endpoint.merge(request);
+        // If a workflow was provided process everything
+        let matched = workflowId !== undefined;
+        const heads = new Set();
+        try {
+            for (var _b = __asyncValues(octokit.paginate.iterator(listRuns)), _c; _c = yield _b.next(), !_c.done;) {
+                const item = _c.value;
+                // There is some sort of bug where the pagination URLs point to a
+                // different endpoint URL which trips up the resulting representation
+                // In that case, fallback to the actual REST 'workflow_runs' property
+                const elements = item.data.length === undefined ? item.data.workflow_runs : item.data;
+                for (const element of elements) {
+                    core.info(`${element.id} : ${element.workflow_url} : ${element.status} : ${element.run_number}`);
+                    if (!matched) {
+                        if (element.id.toString() !== selfRunId) {
+                            // Skip everything up to this run
+                            continue;
+                        }
+                        matched = true;
+                        core.info(`Matched ${selfRunId}`);
+                    }
+                    if ('completed' === element.status.toString()) {
+                        continue;
+                    }
+                    // This is a set of one in the non-schedule case, otherwise everything is a candidate
+                    const head = `${element.head_repository.full_name}/${element.head_branch}`;
+                    if (!heads.has(head)) {
+                        core.info(`First: ${head}`);
+                        heads.add(head);
+                        continue;
+                    }
+                    core.info(`Cancelling: ${head}`);
+                    yield cancelRun(octokit, owner, repo, element.id);
+                }
+            }
+        }
+        catch (e_1_1) { e_1 = { error: e_1_1 }; }
+        finally {
+            try {
+                if (_c && !_c.done && (_a = _b.return)) yield _a.call(_b);
+            }
+            finally { if (e_1) throw e_1.error; }
+        }
+    });
+}
+function run() {
+    return __awaiter(this, void 0, void 0, function* () {
         try {
             const token = core.getInput('token');
+            core.info(token);
             const selfRunId = getRequiredEnv('GITHUB_RUN_ID');
             const repository = getRequiredEnv('GITHUB_REPOSITORY');
             const eventName = getRequiredEnv('GITHUB_EVENT_NAME');
             const [owner, repo] = repository.split('/');
             const branchPrefix = 'refs/heads/';
             const tagPrefix = 'refs/tags/';
+            if ('schedule' === eventName) {
+                const workflowId = core.getInput('workflow');
+                if (!(workflowId.length > 0)) {
+                    throw new Error('Workflow must be specified for schedule event type');
+                }
+                yield cancelDuplicates(token, selfRunId, owner, repo, workflowId);
+                return;
+            }
             if (!['push', 'pull_request'].includes(eventName)) {
                 core.info('Skipping unsupported event');
                 return;
@@ -1492,50 +1579,7 @@ function run() {
             }
             branch = branch.replace(branchPrefix, '');
             core.info(`Branch is ${branch}, repo is ${repo}, and owner is ${owner}, and id is ${selfRunId}`);
-            const octokit = new github.GitHub(token);
-            const listRuns = octokit.actions.listRepoWorkflowRuns.endpoint.merge({
-                owner,
-                repo,
-                branch,
-                event: pullRequest ? 'pull_request' : 'push'
-            });
-            let matched = false;
-            let workflow = '';
-            let headRepoName = '';
-            try {
-                for (var _b = __asyncValues(octokit.paginate.iterator(listRuns)), _c; _c = yield _b.next(), !_c.done;) {
-                    const item = _c.value;
-                    // There is some sort of bug where the pagination URLs point to a
-                    // different endpoint URL which trips up the resulting representation
-                    // In that case, fallback to the actual REST 'workflow_runs' property
-                    const elements = item.data.length === undefined ? item.data.workflow_runs : item.data;
-                    for (const element of elements) {
-                        core.info(`${element.id} : ${element.workflow_url} : ${element.status} : ${element.run_number}`);
-                        if (!matched) {
-                            if (element.id.toString() === selfRunId) {
-                                matched = true;
-                                workflow = element.workflow_url;
-                                headRepoName = pullRequest ? element.head_repository.full_name : '';
-                            }
-                            // Skip everything up to and matching this run
-                            continue;
-                        }
-                        // Only cancel jobs with the same workflow
-                        if (workflow === element.workflow_url &&
-                            element.status.toString() !== 'completed' &&
-                            (!pullRequest || headRepoName === element.head_repository.full_name)) {
-                            yield cancelRun(octokit, owner, repo, element.id);
-                        }
-                    }
-                }
-            }
-            catch (e_1_1) { e_1 = { error: e_1_1 }; }
-            finally {
-                try {
-                    if (_c && !_c.done && (_a = _b.return)) yield _a.call(_b);
-                }
-                finally { if (e_1) throw e_1.error; }
-            }
+            cancelDuplicates(token, selfRunId, owner, repo, undefined, branch, eventName);
         }
         catch (error) {
             core.setFailed(error.message);
diff --git a/src/main.ts b/src/main.ts
index c99a264..9ae7083 100644
--- a/src/main.ts
+++ b/src/main.ts
@@ -1,10 +1,103 @@
 import * as github from '@actions/github'
 import * as core from '@actions/core'
 
+async function cancelDuplicates(
+  token: string,
+  selfRunId: string,
+  owner: string,
+  repo: string,
+  workflowId?: string,
+  branch?: string,
+  event?: string
+): Promise<void> {
+  const octokit = new github.GitHub(token)
+
+  // Deteermind the workflow to reduce the result set, or reference anothre workflow
+  let resolvedId
+  if (workflowId === undefined) {
+    const reply = await octokit.actions.getWorkflowRun({
+      owner,
+      repo,
+      // eslint-disable-next-line @typescript-eslint/camelcase
+      run_id: Number.parseInt(selfRunId)
+    })
+
+    resolvedId = reply.data.workflow_url.split('/').pop()
+  } else {
+    resolvedId = workflowId
+  }
+
+  core.info(`Workflow ID is: ${resolvedId}`)
+
+  const request =
+    branch === undefined
+      ? {
+          owner,
+          repo,
+          // eslint-disable-next-line @typescript-eslint/camelcase
+          workflow_id: resolvedId
+        }
+      : {
+          owner,
+          repo,
+          // eslint-disable-next-line @typescript-eslint/camelcase
+          workflow_id: resolvedId,
+          branch,
+          event
+        }
+
+  const listRuns = octokit.actions.listWorkflowRuns.endpoint.merge(request)
+
+  // If a workflow was provided process everything
+  let matched = workflowId !== undefined
+  const heads = new Set()
+  for await (const item of octokit.paginate.iterator(listRuns)) {
+    // There is some sort of bug where the pagination URLs point to a
+    // different endpoint URL which trips up the resulting representation
+    // In that case, fallback to the actual REST 'workflow_runs' property
+    const elements =
+      item.data.length === undefined ? item.data.workflow_runs : item.data
+
+    for (const element of elements) {
+      core.info(
+        `${element.id} : ${element.workflow_url} : ${element.status} : ${element.run_number}`
+      )
+
+      if (!matched) {
+        if (element.id.toString() !== selfRunId) {
+          // Skip everything up to this run
+          continue
+        }
+
+        matched = true
+        core.info(`Matched ${selfRunId}`)
+      }
+
+      if ('completed' === element.status.toString()) {
+        continue
+      }
+
+      // This is a set of one in the non-schedule case, otherwise everything is a candidate
+      const head = `${element.head_repository.full_name}/${element.head_branch}`
+      if (!heads.has(head)) {
+        core.info(`First: ${head}`)
+        heads.add(head)
+        continue
+      }
+
+      core.info(`Cancelling: ${head}`)
+
+      await cancelRun(octokit, owner, repo, element.id)
+    }
+  }
+}
+
 async function run(): Promise<void> {
   try {
     const token = core.getInput('token')
 
+    core.info(token)
+
     const selfRunId = getRequiredEnv('GITHUB_RUN_ID')
     const repository = getRequiredEnv('GITHUB_REPOSITORY')
     const eventName = getRequiredEnv('GITHUB_EVENT_NAME')
@@ -13,6 +106,15 @@ async function run(): Promise<void> {
     const branchPrefix = 'refs/heads/'
     const tagPrefix = 'refs/tags/'
 
+    if ('schedule' === eventName) {
+      const workflowId = core.getInput('workflow')
+      if (!(workflowId.length > 0)) {
+        throw new Error('Workflow must be specified for schedule event type')
+      }
+      await cancelDuplicates(token, selfRunId, owner, repo, workflowId)
+      return
+    }
+
     if (!['push', 'pull_request'].includes(eventName)) {
       core.info('Skipping unsupported event')
       return
@@ -35,49 +137,15 @@ async function run(): Promise<void> {
       `Branch is ${branch}, repo is ${repo}, and owner is ${owner}, and id is ${selfRunId}`
     )
 
-    const octokit = new github.GitHub(token)
-    const listRuns = octokit.actions.listRepoWorkflowRuns.endpoint.merge({
+    cancelDuplicates(
+      token,
+      selfRunId,
       owner,
       repo,
+      undefined,
       branch,
-      event: pullRequest ? 'pull_request' : 'push'
-    })
-
-    let matched = false
-    let workflow = ''
-    let headRepoName = ''
-    for await (const item of octokit.paginate.iterator(listRuns)) {
-      // There is some sort of bug where the pagination URLs point to a
-      // different endpoint URL which trips up the resulting representation
-      // In that case, fallback to the actual REST 'workflow_runs' property
-      const elements =
-        item.data.length === undefined ? item.data.workflow_runs : item.data
-
-      for (const element of elements) {
-        core.info(
-          `${element.id} : ${element.workflow_url} : ${element.status} : ${element.run_number}`
-        )
-
-        if (!matched) {
-          if (element.id.toString() === selfRunId) {
-            matched = true
-            workflow = element.workflow_url
-            headRepoName = pullRequest ? element.head_repository.full_name : ''
-          }
-          // Skip everything up to and matching this run
-          continue
-        }
-
-        // Only cancel jobs with the same workflow
-        if (
-          workflow === element.workflow_url &&
-          element.status.toString() !== 'completed' &&
-          (!pullRequest || headRepoName === element.head_repository.full_name)
-        ) {
-          await cancelRun(octokit, owner, repo, element.id)
-        }
-      }
-    }
+      eventName
+    )
   } catch (error) {
     core.setFailed(error.message)
   }