You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/01/27 21:01:01 UTC
[beam] branch release-2.19.0 updated: [BEAM-8939] Cherry-pick: A
bash script that cancels stale dataflow jobs
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch release-2.19.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.19.0 by this push:
new 7e2a165 [BEAM-8939] Cherry-pick: A bash script that cancels stale dataflow jobs
new 5286261 Merge pull request #10695 from boyuanzz/cherry-pick
7e2a165 is described below
commit 7e2a16533cda9151861a26f716a59c6994491e0c
Author: Kamil Wasilewski <ka...@polidea.com>
AuthorDate: Tue Jan 14 09:33:45 2020 +0100
[BEAM-8939] Cherry-pick: A bash script that cancels stale dataflow jobs
(cherry picked from commit 4d3295c4279a853758e4218b437a92edba63bd89)
---
.../jenkins/job_CancelStaleDataflowJobs.groovy | 1 -
.test-infra/tools/build.gradle | 25 +----
.test-infra/tools/stale_dataflow_jobs_cleaner.go | 117 ---------------------
.test-infra/tools/stale_dataflow_jobs_cleaner.sh | 23 ++++
.../tools/stale_dataflow_jobs_cleaner_test.go | 74 -------------
5 files changed, 25 insertions(+), 215 deletions(-)
diff --git a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
index a03a1d0..e32d14a 100644
--- a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
+++ b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
@@ -37,7 +37,6 @@ job("beam_CancelStaleDataflowJobs") {
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
- tasks(':beam-test-tools:check')
tasks(':beam-test-tools:cancelStaleDataflowJobs')
commonJobProperties.setGradleSwitches(delegate)
}
diff --git a/.test-infra/tools/build.gradle b/.test-infra/tools/build.gradle
index aabeca0..53445b6 100644
--- a/.test-infra/tools/build.gradle
+++ b/.test-infra/tools/build.gradle
@@ -16,27 +16,6 @@
* limitations under the License.
*/
-plugins {
- id 'org.apache.beam.module'
-}
-
-applyGoNature()
-
-repositories { mavenCentral() }
-
-clean {
- delete '.gogradle'
-}
-
-golang {
- packagePath = 'github.com/apache/beam/.test-infra/tools'
-}
-
-check.dependsOn goTest
-
-task cancelStaleDataflowJobs(type: com.github.blindpirate.gogradle.Go) {
- dependsOn goVendor
- go('get golang.org/x/oauth2/google')
- go('get google.golang.org/api/dataflow/v1b3')
- go('run stale_dataflow_jobs_cleaner.go')
+task cancelStaleDataflowJobs(type: Exec) {
+ commandLine './stale_dataflow_jobs_cleaner.sh'
}
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.go b/.test-infra/tools/stale_dataflow_jobs_cleaner.go
deleted file mode 100644
index 6361e27..0000000
--- a/.test-infra/tools/stale_dataflow_jobs_cleaner.go
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "context"
- "log"
- "strings"
- "time"
-
- "golang.org/x/oauth2/google"
- df "google.golang.org/api/dataflow/v1b3"
-)
-
-const (
- longRunningPrefix = "long-running-"
-)
-
-// client contains methods for listing and cancelling jobs, extracted to allow easier testing.
-type client interface {
- CurrentTime() time.Time
- ListJobs(projectId string) ([]*df.Job, error)
- CancelJob(job *df.Job) error
-}
-
-// dataflowClient implements the client interface for Google Cloud Dataflow.
-type dataflowClient struct {
- s *df.ProjectsJobsService
-}
-
-// newDataflowClient creates a new Dataflow ProjectsJobsService.
-func newDataflowClient() (*dataflowClient, error) {
- ctx := context.Background()
- cl, err := google.DefaultClient(ctx, df.CloudPlatformScope)
- if err != nil {
- return nil, err
- }
- service, err := df.New(cl)
- if err != nil {
- return nil, err
- }
- return &dataflowClient{s: df.NewProjectsJobsService(service)}, nil
-}
-
-// CurrentTime gets the time Now.
-func (c dataflowClient) CurrentTime() time.Time {
- return time.Now()
-}
-
-// ListJobs lists the active Dataflow jobs for a project.
-func (c dataflowClient) ListJobs(projectId string) ([]*df.Job, error) {
- resp, err := c.s.Aggregated(projectId).Filter("ACTIVE").Fields("jobs(id,name,projectId,createTime)").Do()
- if err != nil {
- return nil, err
- }
- return resp.Jobs, nil
-}
-
-// CancelJob requests the cancellation od a Dataflow job.
-func (c dataflowClient) CancelJob(job *df.Job) error {
- jobDone := df.Job{
- RequestedState: "JOB_STATE_DONE",
- }
- _, err := c.s.Update(job.ProjectId, job.Id, &jobDone).Do()
- return err
-}
-
-// cleanDataflowJobs cancels stale Dataflow jobs, excluding the longRunningPrefix prefixed jobs.
-func cleanDataflowJobs(c client, projectId string, hoursStale float64) error {
- now := c.CurrentTime()
- jobs, err := c.ListJobs(projectId)
- if err != nil {
- return err
- }
- for _, j := range jobs {
- t, err := time.Parse(time.RFC3339, j.CreateTime)
- if err != nil {
- return err
- }
- hoursSinceCreate := now.Sub(t).Hours()
- log.Printf("Job %v %v %v %v %.2f\n", j.ProjectId, j.Id, j.Name, j.CreateTime, hoursSinceCreate)
- if hoursSinceCreate > hoursStale && !strings.HasPrefix(j.Name, longRunningPrefix) {
- log.Printf("Attempting to cancel %v\n", j.Id)
- c.CancelJob(j)
- }
- }
- return nil
-}
-
-func main() {
- client, err := newDataflowClient()
- if err != nil {
- log.Fatalf("Error creating dataflow client, %v", err)
- }
- // Cancel any jobs older than 3 hours.
- err = cleanDataflowJobs(client, "apache-beam-testing", 3.0)
- if err != nil {
- log.Fatalf("Error cleaning dataflow jobs, %v", err)
- }
- log.Printf("Done")
-}
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.sh b/.test-infra/tools/stale_dataflow_jobs_cleaner.sh
new file mode 100755
index 0000000..66bf880
--- /dev/null
+++ b/.test-infra/tools/stale_dataflow_jobs_cleaner.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Cancels active Dataflow jobs older than 3 hours.
+#
+set -euo pipefail
+
+gcloud dataflow jobs list --created-before=-P3H --format='value(JOB_ID)' \
+--status=active --region=us-central1 | xargs gcloud dataflow jobs cancel
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go b/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
deleted file mode 100644
index 342052a..0000000
--- a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- df "google.golang.org/api/dataflow/v1b3"
- "reflect"
- "testing"
- "time"
-)
-
-var (
- currentTime time.Time = time.Now()
- jobsReturned = []*df.Job{}
- cancelledJobs = []*df.Job{}
-)
-
-type fakeClient struct{}
-
-func (c fakeClient) ListJobs(projectId string) ([]*df.Job, error) {
- return jobsReturned, nil
-}
-
-func (c fakeClient) CancelJob(job *df.Job) error {
- cancelledJobs = append(cancelledJobs, job)
- return nil
-}
-
-func (c fakeClient) CurrentTime() time.Time {
- return currentTime
-}
-
-func helperForJobCancel(t *testing.T, hoursStale float64, jobList []*df.Job, expectedCancelled []*df.Job) {
- var c fakeClient
- jobsReturned = jobList
- cancelledJobs = []*df.Job{}
- cleanDataflowJobs(c, "some-project-id", 2.0)
- if !reflect.DeepEqual(cancelledJobs, expectedCancelled) {
- t.Errorf("Cancelled arrays not as expected actual=%v, expected=%v", cancelledJobs, expectedCancelled)
- }
-}
-
-func TestEmptyJobList(t *testing.T) {
- helperForJobCancel(t, 2.0, []*df.Job{}, []*df.Job{})
-}
-
-func TestNotExpiredJob(t *testing.T) {
- // Just under 2 hours.
- createTime := currentTime.Add(-(2*time.Hour - time.Second))
- helperForJobCancel(t, 2.0, []*df.Job{&df.Job{CreateTime: createTime.Format(time.RFC3339)}}, []*df.Job{})
-}
-
-func TestExpiredJob(t *testing.T) {
- // Just over 2 hours.
- createTime := currentTime.Add(-(2*time.Hour + time.Second))
- job := &df.Job{CreateTime: createTime.Format(time.RFC3339)}
- helperForJobCancel(t, 2.0, []*df.Job{job}, []*df.Job{job})
-}