You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/05 21:53:58 UTC
[beam] branch master updated: [BEAM-6081]: Create "Dataflow Reaper"
infrastructure to periodically clean up stuck Dataflow jobs (#7985)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 991285c [BEAM-6081]: Create "Dataflow Reaper" infrastructure to periodically clean up stuck Dataflow jobs (#7985)
991285c is described below
commit 991285c891a861606edfc6ed01cbe4a29d615839
Author: Alan Myrvold <al...@comcast.net>
AuthorDate: Tue Mar 5 13:53:46 2019 -0800
[BEAM-6081]: Create "Dataflow Reaper" infrastructure to periodically clean up stuck Dataflow jobs (#7985)
* [BEAM-6081]: Create "Dataflow Reaper" infrastructure to periodically clean up stuck Dataflow jobs
---
.../jenkins/job_CancelStaleDataflowJobs.groovy | 45 ++++++++
.test-infra/tools/build.gradle | 42 ++++++++
.test-infra/tools/stale_dataflow_jobs_cleaner.go | 117 +++++++++++++++++++++
.../tools/stale_dataflow_jobs_cleaner_test.go | 74 +++++++++++++
settings.gradle | 2 +
5 files changed, 280 insertions(+)
diff --git a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
new file mode 100644
index 0000000..20760e8
--- /dev/null
+++ b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+
+job("beam_CancelStaleDataflowJobs") {
+ description("Cancel stale dataflow jobs")
+
+ // Set common parameters.
+ commonJobProperties.setTopLevelMainJobProperties(delegate)
+
+ // Sets that this is a cron job, run once randomly per day.
+ commonJobProperties.setCronJob(delegate, 'H H * * *')
+
+ // Allows triggering this build against pull requests.
+ commonJobProperties.enablePhraseTriggeringFromPullRequest(
+ delegate,
+ 'Cancel Stale Dataflow Jobs',
+ 'Run Cancel Stale Dataflow Jobs')
+
+ // Gradle goals for this job.
+ steps {
+ gradle {
+ rootBuildScriptDir(commonJobProperties.checkoutDir)
+ tasks(':beam-test-tools:check')
+ tasks(':beam-test-tools:cancelStaleDataflowJobs')
+ commonJobProperties.setGradleSwitches(delegate)
+ }
+ }
+}
diff --git a/.test-infra/tools/build.gradle b/.test-infra/tools/build.gradle
new file mode 100644
index 0000000..8fd8473
--- /dev/null
+++ b/.test-infra/tools/build.gradle
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+ id 'org.apache.beam.module'
+}
+
+applyGoNature()
+
+repositories { mavenCentral() }
+
+clean {
+ delete '.gogradle'
+}
+
+golang {
+ packagePath = 'github.com/apache/beam/testing/tools'
+}
+
+check.dependsOn goTest
+
+task cancelStaleDataflowJobs(type: com.github.blindpirate.gogradle.Go) {
+ dependsOn goVendor
+ go('get golang.org/x/oauth2/google')
+ go('get google.golang.org/api/dataflow/v1b3')
+ go('run stale_dataflow_jobs_cleaner.go')
+}
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.go b/.test-infra/tools/stale_dataflow_jobs_cleaner.go
new file mode 100644
index 0000000..6361e27
--- /dev/null
+++ b/.test-infra/tools/stale_dataflow_jobs_cleaner.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "log"
+ "strings"
+ "time"
+
+ "golang.org/x/oauth2/google"
+ df "google.golang.org/api/dataflow/v1b3"
+)
+
+const (
+ longRunningPrefix = "long-running-"
+)
+
+// client contains methods for listing and cancelling jobs, extracted to allow easier testing.
+type client interface {
+ CurrentTime() time.Time
+ ListJobs(projectId string) ([]*df.Job, error)
+ CancelJob(job *df.Job) error
+}
+
+// dataflowClient implements the client interface for Google Cloud Dataflow.
+type dataflowClient struct {
+ s *df.ProjectsJobsService
+}
+
+// newDataflowClient creates a new Dataflow ProjectsJobsService.
+func newDataflowClient() (*dataflowClient, error) {
+ ctx := context.Background()
+ cl, err := google.DefaultClient(ctx, df.CloudPlatformScope)
+ if err != nil {
+ return nil, err
+ }
+ service, err := df.New(cl)
+ if err != nil {
+ return nil, err
+ }
+ return &dataflowClient{s: df.NewProjectsJobsService(service)}, nil
+}
+
+// CurrentTime gets the time Now.
+func (c dataflowClient) CurrentTime() time.Time {
+ return time.Now()
+}
+
+// ListJobs lists the active Dataflow jobs for a project.
+func (c dataflowClient) ListJobs(projectId string) ([]*df.Job, error) {
+ resp, err := c.s.Aggregated(projectId).Filter("ACTIVE").Fields("jobs(id,name,projectId,createTime)").Do()
+ if err != nil {
+ return nil, err
+ }
+ return resp.Jobs, nil
+}
+
+// CancelJob requests the cancellation od a Dataflow job.
+func (c dataflowClient) CancelJob(job *df.Job) error {
+ jobDone := df.Job{
+ RequestedState: "JOB_STATE_DONE",
+ }
+ _, err := c.s.Update(job.ProjectId, job.Id, &jobDone).Do()
+ return err
+}
+
+// cleanDataflowJobs cancels stale Dataflow jobs, excluding the longRunningPrefix prefixed jobs.
+func cleanDataflowJobs(c client, projectId string, hoursStale float64) error {
+ now := c.CurrentTime()
+ jobs, err := c.ListJobs(projectId)
+ if err != nil {
+ return err
+ }
+ for _, j := range jobs {
+ t, err := time.Parse(time.RFC3339, j.CreateTime)
+ if err != nil {
+ return err
+ }
+ hoursSinceCreate := now.Sub(t).Hours()
+ log.Printf("Job %v %v %v %v %.2f\n", j.ProjectId, j.Id, j.Name, j.CreateTime, hoursSinceCreate)
+ if hoursSinceCreate > hoursStale && !strings.HasPrefix(j.Name, longRunningPrefix) {
+ log.Printf("Attempting to cancel %v\n", j.Id)
+ c.CancelJob(j)
+ }
+ }
+ return nil
+}
+
+func main() {
+ client, err := newDataflowClient()
+ if err != nil {
+ log.Fatalf("Error creating dataflow client, %v", err)
+ }
+ // Cancel any jobs older than 3 hours.
+ err = cleanDataflowJobs(client, "apache-beam-testing", 3.0)
+ if err != nil {
+ log.Fatalf("Error cleaning dataflow jobs, %v", err)
+ }
+ log.Printf("Done")
+}
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go b/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
new file mode 100644
index 0000000..342052a
--- /dev/null
+++ b/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ df "google.golang.org/api/dataflow/v1b3"
+ "reflect"
+ "testing"
+ "time"
+)
+
+var (
+ currentTime time.Time = time.Now()
+ jobsReturned = []*df.Job{}
+ cancelledJobs = []*df.Job{}
+)
+
+type fakeClient struct{}
+
+func (c fakeClient) ListJobs(projectId string) ([]*df.Job, error) {
+ return jobsReturned, nil
+}
+
+func (c fakeClient) CancelJob(job *df.Job) error {
+ cancelledJobs = append(cancelledJobs, job)
+ return nil
+}
+
+func (c fakeClient) CurrentTime() time.Time {
+ return currentTime
+}
+
+func helperForJobCancel(t *testing.T, hoursStale float64, jobList []*df.Job, expectedCancelled []*df.Job) {
+ var c fakeClient
+ jobsReturned = jobList
+ cancelledJobs = []*df.Job{}
+ cleanDataflowJobs(c, "some-project-id", 2.0)
+ if !reflect.DeepEqual(cancelledJobs, expectedCancelled) {
+ t.Errorf("Cancelled arrays not as expected actual=%v, expected=%v", cancelledJobs, expectedCancelled)
+ }
+}
+
+func TestEmptyJobList(t *testing.T) {
+ helperForJobCancel(t, 2.0, []*df.Job{}, []*df.Job{})
+}
+
+func TestNotExpiredJob(t *testing.T) {
+ // Just under 2 hours.
+ createTime := currentTime.Add(-(2*time.Hour - time.Second))
+ helperForJobCancel(t, 2.0, []*df.Job{&df.Job{CreateTime: createTime.Format(time.RFC3339)}}, []*df.Job{})
+}
+
+func TestExpiredJob(t *testing.T) {
+ // Just over 2 hours.
+ createTime := currentTime.Add(-(2*time.Hour + time.Second))
+ job := &df.Job{CreateTime: createTime.Format(time.RFC3339)}
+ helperForJobCancel(t, 2.0, []*df.Job{job}, []*df.Job{job})
+}
diff --git a/settings.gradle b/settings.gradle
index 1f22cb9..b8fc633 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -229,3 +229,5 @@ include "beam-test-infra-metrics"
project(":beam-test-infra-metrics").dir = file(".test-infra/metrics")
include "beam-sdks-java-bom"
project(":beam-sdks-java-bom").dir = file("sdks/java/bom")
+include "beam-test-tools"
+project(":beam-test-tools").dir = file(".test-infra/tools")