You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/19 21:00:40 UTC

[beam] branch master updated: [BEAM-14487] Make drain & update terminal states. (#17710)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new de497f7766d [BEAM-14487] Make drain & update terminal states. (#17710)
de497f7766d is described below

commit de497f7766d559fbcf8558be7775afe46bd9cd0a
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu May 19 14:00:34 2022 -0700

    [BEAM-14487] Make drain & update terminal states. (#17710)
---
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   | 49 +++++++++++++++-------
 .../beam/runners/dataflow/dataflowlib/job_test.go  | 37 ++++++++++++++++
 2 files changed, 70 insertions(+), 16 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 6aa0712daec..459c3f52a4b 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -240,29 +240,46 @@ func WaitForCompletion(ctx context.Context, client *df.Service, project, region,
 			return errors.Wrap(err, "failed to get job")
 		}
 
-		switch j.CurrentState {
-		case "JOB_STATE_DONE":
-			log.Info(ctx, "Job succeeded!")
-			return nil
-
-		case "JOB_STATE_CANCELLED":
-			log.Info(ctx, "Job cancelled")
+		terminal, msg, err := currentStateMessage(j.CurrentState, jobID)
+		if err != nil {
+			return err
+		}
+		log.Infof(ctx, msg)
+		if terminal {
 			return nil
-
-		case "JOB_STATE_FAILED":
-			return errors.Errorf("job %s failed", jobID)
-
-		case "JOB_STATE_RUNNING":
-			log.Info(ctx, "Job still running ...")
-
-		default:
-			log.Infof(ctx, "Job state: %v ...", j.CurrentState)
 		}
 
 		time.Sleep(30 * time.Second)
 	}
 }
 
+// currentStateMessage indicates if the state is terminal, and provides a message to log, or an error.
+// Errors are always terminal.
+func currentStateMessage(currentState, jobID string) (bool, string, error) {
+	switch currentState {
+	// Add all Terminal Success stats here.
+	case "JOB_STATE_DONE", "JOB_STATE_CANCELLED", "JOB_STATE_DRAINED", "JOB_STATE_UPDATED":
+		var state string
+		switch currentState {
+		case "JOB_STATE_DONE":
+			state = "succeeded!"
+		case "JOB_STATE_CANCELLED":
+			state = "cancelled"
+		case "JOB_STATE_DRAINED":
+			state = "drained"
+		case "JOB_STATE_UPDATED":
+			state = "updated"
+		}
+		return true, fmt.Sprintf("Job %v %v", jobID, state), nil
+	case "JOB_STATE_FAILED":
+		return true, "", errors.Errorf("Job %s failed", jobID)
+	case "JOB_STATE_RUNNING":
+		return false, "Job still running ...", nil
+	default:
+		return false, fmt.Sprintf("Job state: %v ...", currentState), nil
+	}
+}
+
 // NewClient creates a new dataflow client with default application credentials
 // and CloudPlatformScope. The Dataflow endpoint is optionally overridden.
 func NewClient(ctx context.Context, endpoint string) (*df.Service, error) {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
index 1bf178f2700..1a366e7bebf 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
@@ -17,6 +17,7 @@ package dataflowlib
 
 import (
 	"context"
+	"fmt"
 	"reflect"
 	"testing"
 )
@@ -157,3 +158,39 @@ func TestValidateWorkerSettings(t *testing.T) {
 		})
 	}
 }
+
+func TestCurrentStateMessage(t *testing.T) {
+	tests := []struct {
+		state   string
+		term    bool
+		want    string
+		wantErr error
+	}{
+		{state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded!", term: true},
+		{state: "JOB_STATE_DRAINED", want: "Job JorbID-09876 drained", term: true},
+		{state: "JOB_STATE_UPDATED", want: "Job JorbID-09876 updated", term: true},
+		{state: "JOB_STATE_CANCELLED", want: "Job JorbID-09876 cancelled", term: true},
+		{state: "JOB_STATE_RUNNING", want: "Job still running ...", term: false},
+		{state: "JOB_STATE_FAILED", wantErr: fmt.Errorf("Job JorbID-09876 failed"), term: true},
+		{state: "Ossiphrage", want: "Job state: Ossiphrage ...", term: false},
+	}
+	for _, test := range tests {
+		t.Run(test.state, func(t *testing.T) {
+			const jobID = "JorbID-09876"
+			term, got, err := currentStateMessage(test.state, jobID)
+			if term != test.term {
+				termGot, termWant := "false (continues)", "true (terminal)"
+				if !test.term {
+					termGot, termWant = termWant, termGot
+				}
+				t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, termGot, termWant)
+			}
+			if err != nil && err.Error() != test.wantErr.Error() {
+				t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, err, test.wantErr)
+			}
+			if got != test.want {
+				t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, got, test.want)
+			}
+		})
+	}
+}