You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/01/27 21:01:01 UTC

[beam] branch release-2.19.0 updated: [BEAM-8939] Cherry-pick: A bash script that cancels stale dataflow jobs

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch release-2.19.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.19.0 by this push:
     new 7e2a165  [BEAM-8939] Cherry-pick: A bash script that cancels stale dataflow jobs
     new 5286261  Merge pull request #10695 from boyuanzz/cherry-pick
7e2a165 is described below

commit 7e2a16533cda9151861a26f716a59c6994491e0c
Author: Kamil Wasilewski <ka...@polidea.com>
AuthorDate: Tue Jan 14 09:33:45 2020 +0100

    [BEAM-8939] Cherry-pick: A bash script that cancels stale dataflow jobs
    
    (cherry picked from commit 4d3295c4279a853758e4218b437a92edba63bd89)
---
 .../jenkins/job_CancelStaleDataflowJobs.groovy     |   1 -
 .test-infra/tools/build.gradle                     |  25 +----
 .test-infra/tools/stale_dataflow_jobs_cleaner.go   | 117 ---------------------
 .test-infra/tools/stale_dataflow_jobs_cleaner.sh   |  23 ++++
 .../tools/stale_dataflow_jobs_cleaner_test.go      |  74 -------------
 5 files changed, 25 insertions(+), 215 deletions(-)

diff --git a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
index a03a1d0..e32d14a 100644
--- a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
+++ b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
@@ -37,7 +37,6 @@ job("beam_CancelStaleDataflowJobs") {
   steps {
     gradle {
       rootBuildScriptDir(commonJobProperties.checkoutDir)
-      tasks(':beam-test-tools:check')
       tasks(':beam-test-tools:cancelStaleDataflowJobs')
       commonJobProperties.setGradleSwitches(delegate)
     }
diff --git a/.test-infra/tools/build.gradle b/.test-infra/tools/build.gradle
index aabeca0..53445b6 100644
--- a/.test-infra/tools/build.gradle
+++ b/.test-infra/tools/build.gradle
@@ -16,27 +16,6 @@
  * limitations under the License.
  */
 
-plugins {
-  id 'org.apache.beam.module'
-}
-
-applyGoNature()
-
-repositories { mavenCentral() }
-
-clean {
-  delete '.gogradle'
-}
-
-golang {
-  packagePath = 'github.com/apache/beam/.test-infra/tools'
-}
-
-check.dependsOn goTest
-
-task cancelStaleDataflowJobs(type: com.github.blindpirate.gogradle.Go) {
-  dependsOn goVendor
-  go('get golang.org/x/oauth2/google')
-  go('get google.golang.org/api/dataflow/v1b3')
-  go('run stale_dataflow_jobs_cleaner.go')
+task cancelStaleDataflowJobs(type: Exec) {
+  commandLine './stale_dataflow_jobs_cleaner.sh'
 }
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.go b/.test-infra/tools/stale_dataflow_jobs_cleaner.go
deleted file mode 100644
index 6361e27..0000000
--- a/.test-infra/tools/stale_dataflow_jobs_cleaner.go
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
-	"context"
-	"log"
-	"strings"
-	"time"
-
-	"golang.org/x/oauth2/google"
-	df "google.golang.org/api/dataflow/v1b3"
-)
-
-const (
-	longRunningPrefix = "long-running-"
-)
-
-// client contains methods for listing and cancelling jobs, extracted to allow easier testing.
-type client interface {
-	CurrentTime() time.Time
-	ListJobs(projectId string) ([]*df.Job, error)
-	CancelJob(job *df.Job) error
-}
-
-// dataflowClient implements the client interface for Google Cloud Dataflow.
-type dataflowClient struct {
-	s *df.ProjectsJobsService
-}
-
-// newDataflowClient creates a new Dataflow ProjectsJobsService.
-func newDataflowClient() (*dataflowClient, error) {
-	ctx := context.Background()
-	cl, err := google.DefaultClient(ctx, df.CloudPlatformScope)
-	if err != nil {
-		return nil, err
-	}
-	service, err := df.New(cl)
-	if err != nil {
-		return nil, err
-	}
-	return &dataflowClient{s: df.NewProjectsJobsService(service)}, nil
-}
-
-// CurrentTime gets the time Now.
-func (c dataflowClient) CurrentTime() time.Time {
-	return time.Now()
-}
-
-// ListJobs lists the active Dataflow jobs for a project.
-func (c dataflowClient) ListJobs(projectId string) ([]*df.Job, error) {
-	resp, err := c.s.Aggregated(projectId).Filter("ACTIVE").Fields("jobs(id,name,projectId,createTime)").Do()
-	if err != nil {
-		return nil, err
-	}
-	return resp.Jobs, nil
-}
-
-// CancelJob requests the cancellation od a Dataflow job.
-func (c dataflowClient) CancelJob(job *df.Job) error {
-	jobDone := df.Job{
-		RequestedState: "JOB_STATE_DONE",
-	}
-	_, err := c.s.Update(job.ProjectId, job.Id, &jobDone).Do()
-	return err
-}
-
-// cleanDataflowJobs cancels stale Dataflow jobs, excluding the longRunningPrefix prefixed jobs.
-func cleanDataflowJobs(c client, projectId string, hoursStale float64) error {
-	now := c.CurrentTime()
-	jobs, err := c.ListJobs(projectId)
-	if err != nil {
-		return err
-	}
-	for _, j := range jobs {
-		t, err := time.Parse(time.RFC3339, j.CreateTime)
-		if err != nil {
-			return err
-		}
-		hoursSinceCreate := now.Sub(t).Hours()
-		log.Printf("Job %v %v %v %v %.2f\n", j.ProjectId, j.Id, j.Name, j.CreateTime, hoursSinceCreate)
-		if hoursSinceCreate > hoursStale && !strings.HasPrefix(j.Name, longRunningPrefix) {
-			log.Printf("Attempting to cancel %v\n", j.Id)
-			c.CancelJob(j)
-		}
-	}
-	return nil
-}
-
-func main() {
-	client, err := newDataflowClient()
-	if err != nil {
-		log.Fatalf("Error creating dataflow client, %v", err)
-	}
-	// Cancel any jobs older than 3 hours.
-	err = cleanDataflowJobs(client, "apache-beam-testing", 3.0)
-	if err != nil {
-		log.Fatalf("Error cleaning dataflow jobs, %v", err)
-	}
-	log.Printf("Done")
-}
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.sh b/.test-infra/tools/stale_dataflow_jobs_cleaner.sh
new file mode 100755
index 0000000..66bf880
--- /dev/null
+++ b/.test-infra/tools/stale_dataflow_jobs_cleaner.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+#    Cancels active Dataflow jobs older than 3 hours.
+#
+set -euo pipefail
+
+gcloud dataflow jobs list --created-before=-P3H --format='value(JOB_ID)' \
+--status=active --region=us-central1 | xargs gcloud dataflow jobs cancel
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go b/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
deleted file mode 100644
index 342052a..0000000
--- a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
-	df "google.golang.org/api/dataflow/v1b3"
-	"reflect"
-	"testing"
-	"time"
-)
-
-var (
-	currentTime   time.Time = time.Now()
-	jobsReturned            = []*df.Job{}
-	cancelledJobs           = []*df.Job{}
-)
-
-type fakeClient struct{}
-
-func (c fakeClient) ListJobs(projectId string) ([]*df.Job, error) {
-	return jobsReturned, nil
-}
-
-func (c fakeClient) CancelJob(job *df.Job) error {
-	cancelledJobs = append(cancelledJobs, job)
-	return nil
-}
-
-func (c fakeClient) CurrentTime() time.Time {
-	return currentTime
-}
-
-func helperForJobCancel(t *testing.T, hoursStale float64, jobList []*df.Job, expectedCancelled []*df.Job) {
-	var c fakeClient
-	jobsReturned = jobList
-	cancelledJobs = []*df.Job{}
-	cleanDataflowJobs(c, "some-project-id", 2.0)
-	if !reflect.DeepEqual(cancelledJobs, expectedCancelled) {
-		t.Errorf("Cancelled arrays not as expected actual=%v, expected=%v", cancelledJobs, expectedCancelled)
-	}
-}
-
-func TestEmptyJobList(t *testing.T) {
-	helperForJobCancel(t, 2.0, []*df.Job{}, []*df.Job{})
-}
-
-func TestNotExpiredJob(t *testing.T) {
-	// Just under 2 hours.
-	createTime := currentTime.Add(-(2*time.Hour - time.Second))
-	helperForJobCancel(t, 2.0, []*df.Job{&df.Job{CreateTime: createTime.Format(time.RFC3339)}}, []*df.Job{})
-}
-
-func TestExpiredJob(t *testing.T) {
-	// Just over 2 hours.
-	createTime := currentTime.Add(-(2*time.Hour + time.Second))
-	job := &df.Job{CreateTime: createTime.Format(time.RFC3339)}
-	helperForJobCancel(t, 2.0, []*df.Job{job}, []*df.Job{job})
-}