You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/05 21:53:58 UTC

[beam] branch master updated: [BEAM-6081]: Create "Dataflow Reaper" infrastructure to periodically clean up stuck Dataflow jobs (#7985)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 991285c  [BEAM-6081]: Create "Dataflow Reaper" infrastructure to periodically clean up stuck Dataflow jobs (#7985)
991285c is described below

commit 991285c891a861606edfc6ed01cbe4a29d615839
Author: Alan Myrvold <al...@comcast.net>
AuthorDate: Tue Mar 5 13:53:46 2019 -0800

    [BEAM-6081]: Create "Dataflow Reaper" infrastructure to periodically clean up stuck Dataflow jobs (#7985)
    
    * [BEAM-6081]: Create "Dataflow Reaper" infrastructure to periodically clean up stuck Dataflow jobs
---
 .../jenkins/job_CancelStaleDataflowJobs.groovy     |  45 ++++++++
 .test-infra/tools/build.gradle                     |  42 ++++++++
 .test-infra/tools/stale_dataflow_jobs_cleaner.go   | 117 +++++++++++++++++++++
 .../tools/stale_dataflow_jobs_cleaner_test.go      |  74 +++++++++++++
 settings.gradle                                    |   2 +
 5 files changed, 280 insertions(+)

diff --git a/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
new file mode 100644
index 0000000..20760e8
--- /dev/null
+++ b/.test-infra/jenkins/job_CancelStaleDataflowJobs.groovy
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+
+job("beam_CancelStaleDataflowJobs") {
+    description("Cancel stale dataflow jobs")
+
+    // Set common parameters.
+    commonJobProperties.setTopLevelMainJobProperties(delegate)
+
+    // Sets that this is a cron job, run once randomly per day.
+    commonJobProperties.setCronJob(delegate, 'H H * * *')
+
+    // Allows triggering this build against pull requests.
+    commonJobProperties.enablePhraseTriggeringFromPullRequest(
+      delegate,
+      'Cancel Stale Dataflow Jobs',
+      'Run Cancel Stale Dataflow Jobs')
+
+  // Gradle goals for this job.
+  steps {
+    gradle {
+      rootBuildScriptDir(commonJobProperties.checkoutDir)
+      tasks(':beam-test-tools:check')
+      tasks(':beam-test-tools:cancelStaleDataflowJobs')
+      commonJobProperties.setGradleSwitches(delegate)
+    }
+  }
+}
diff --git a/.test-infra/tools/build.gradle b/.test-infra/tools/build.gradle
new file mode 100644
index 0000000..8fd8473
--- /dev/null
+++ b/.test-infra/tools/build.gradle
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id 'org.apache.beam.module'
+}
+
+applyGoNature()
+
+repositories { mavenCentral() }
+
+clean {
+  delete '.gogradle'
+}
+
+golang {
+  packagePath = 'github.com/apache/beam/testing/tools'
+}
+
+check.dependsOn goTest
+
+task cancelStaleDataflowJobs(type: com.github.blindpirate.gogradle.Go) {
+  dependsOn goVendor
+  go('get golang.org/x/oauth2/google')
+  go('get google.golang.org/api/dataflow/v1b3')
+  go('run stale_dataflow_jobs_cleaner.go')
+}
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner.go b/.test-infra/tools/stale_dataflow_jobs_cleaner.go
new file mode 100644
index 0000000..6361e27
--- /dev/null
+++ b/.test-infra/tools/stale_dataflow_jobs_cleaner.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+	"context"
+	"log"
+	"strings"
+	"time"
+
+	"golang.org/x/oauth2/google"
+	df "google.golang.org/api/dataflow/v1b3"
+)
+
+const (
+	longRunningPrefix = "long-running-"
+)
+
+// client contains methods for listing and cancelling jobs, extracted to allow easier testing.
+type client interface {
+	CurrentTime() time.Time
+	ListJobs(projectId string) ([]*df.Job, error)
+	CancelJob(job *df.Job) error
+}
+
+// dataflowClient implements the client interface for Google Cloud Dataflow.
+type dataflowClient struct {
+	s *df.ProjectsJobsService
+}
+
+// newDataflowClient creates a new Dataflow ProjectsJobsService.
+func newDataflowClient() (*dataflowClient, error) {
+	ctx := context.Background()
+	cl, err := google.DefaultClient(ctx, df.CloudPlatformScope)
+	if err != nil {
+		return nil, err
+	}
+	service, err := df.New(cl)
+	if err != nil {
+		return nil, err
+	}
+	return &dataflowClient{s: df.NewProjectsJobsService(service)}, nil
+}
+
+// CurrentTime gets the time Now.
+func (c dataflowClient) CurrentTime() time.Time {
+	return time.Now()
+}
+
+// ListJobs lists the active Dataflow jobs for a project.
+func (c dataflowClient) ListJobs(projectId string) ([]*df.Job, error) {
+	resp, err := c.s.Aggregated(projectId).Filter("ACTIVE").Fields("jobs(id,name,projectId,createTime)").Do()
+	if err != nil {
+		return nil, err
+	}
+	return resp.Jobs, nil
+}
+
+// CancelJob requests the cancellation od a Dataflow job.
+func (c dataflowClient) CancelJob(job *df.Job) error {
+	jobDone := df.Job{
+		RequestedState: "JOB_STATE_DONE",
+	}
+	_, err := c.s.Update(job.ProjectId, job.Id, &jobDone).Do()
+	return err
+}
+
+// cleanDataflowJobs cancels stale Dataflow jobs, excluding the longRunningPrefix prefixed jobs.
+func cleanDataflowJobs(c client, projectId string, hoursStale float64) error {
+	now := c.CurrentTime()
+	jobs, err := c.ListJobs(projectId)
+	if err != nil {
+		return err
+	}
+	for _, j := range jobs {
+		t, err := time.Parse(time.RFC3339, j.CreateTime)
+		if err != nil {
+			return err
+		}
+		hoursSinceCreate := now.Sub(t).Hours()
+		log.Printf("Job %v %v %v %v %.2f\n", j.ProjectId, j.Id, j.Name, j.CreateTime, hoursSinceCreate)
+		if hoursSinceCreate > hoursStale && !strings.HasPrefix(j.Name, longRunningPrefix) {
+			log.Printf("Attempting to cancel %v\n", j.Id)
+			c.CancelJob(j)
+		}
+	}
+	return nil
+}
+
+func main() {
+	client, err := newDataflowClient()
+	if err != nil {
+		log.Fatalf("Error creating dataflow client, %v", err)
+	}
+	// Cancel any jobs older than 3 hours.
+	err = cleanDataflowJobs(client, "apache-beam-testing", 3.0)
+	if err != nil {
+		log.Fatalf("Error cleaning dataflow jobs, %v", err)
+	}
+	log.Printf("Done")
+}
diff --git a/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go b/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
new file mode 100644
index 0000000..342052a
--- /dev/null
+++ b/.test-infra/tools/stale_dataflow_jobs_cleaner_test.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+	df "google.golang.org/api/dataflow/v1b3"
+	"reflect"
+	"testing"
+	"time"
+)
+
+var (
+	currentTime   time.Time = time.Now()
+	jobsReturned            = []*df.Job{}
+	cancelledJobs           = []*df.Job{}
+)
+
+type fakeClient struct{}
+
+func (c fakeClient) ListJobs(projectId string) ([]*df.Job, error) {
+	return jobsReturned, nil
+}
+
+func (c fakeClient) CancelJob(job *df.Job) error {
+	cancelledJobs = append(cancelledJobs, job)
+	return nil
+}
+
+func (c fakeClient) CurrentTime() time.Time {
+	return currentTime
+}
+
+func helperForJobCancel(t *testing.T, hoursStale float64, jobList []*df.Job, expectedCancelled []*df.Job) {
+	var c fakeClient
+	jobsReturned = jobList
+	cancelledJobs = []*df.Job{}
+	cleanDataflowJobs(c, "some-project-id", 2.0)
+	if !reflect.DeepEqual(cancelledJobs, expectedCancelled) {
+		t.Errorf("Cancelled arrays not as expected actual=%v, expected=%v", cancelledJobs, expectedCancelled)
+	}
+}
+
+func TestEmptyJobList(t *testing.T) {
+	helperForJobCancel(t, 2.0, []*df.Job{}, []*df.Job{})
+}
+
+func TestNotExpiredJob(t *testing.T) {
+	// Just under 2 hours.
+	createTime := currentTime.Add(-(2*time.Hour - time.Second))
+	helperForJobCancel(t, 2.0, []*df.Job{&df.Job{CreateTime: createTime.Format(time.RFC3339)}}, []*df.Job{})
+}
+
+func TestExpiredJob(t *testing.T) {
+	// Just over 2 hours.
+	createTime := currentTime.Add(-(2*time.Hour + time.Second))
+	job := &df.Job{CreateTime: createTime.Format(time.RFC3339)}
+	helperForJobCancel(t, 2.0, []*df.Job{job}, []*df.Job{job})
+}
diff --git a/settings.gradle b/settings.gradle
index 1f22cb9..b8fc633 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -229,3 +229,5 @@ include "beam-test-infra-metrics"
 project(":beam-test-infra-metrics").dir = file(".test-infra/metrics")
 include "beam-sdks-java-bom"
 project(":beam-sdks-java-bom").dir = file("sdks/java/bom")
+include "beam-test-tools"
+project(":beam-test-tools").dir = file(".test-infra/tools")