You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/19 21:00:40 UTC
[beam] branch master updated: [BEAM-14487] Make drain & update terminal states. (#17710)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new de497f7766d [BEAM-14487] Make drain & update terminal states. (#17710)
de497f7766d is described below
commit de497f7766d559fbcf8558be7775afe46bd9cd0a
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu May 19 14:00:34 2022 -0700
[BEAM-14487] Make drain & update terminal states. (#17710)
---
.../pkg/beam/runners/dataflow/dataflowlib/job.go | 49 +++++++++++++++-------
.../beam/runners/dataflow/dataflowlib/job_test.go | 37 ++++++++++++++++
2 files changed, 70 insertions(+), 16 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 6aa0712daec..459c3f52a4b 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -240,29 +240,46 @@ func WaitForCompletion(ctx context.Context, client *df.Service, project, region,
return errors.Wrap(err, "failed to get job")
}
- switch j.CurrentState {
- case "JOB_STATE_DONE":
- log.Info(ctx, "Job succeeded!")
- return nil
-
- case "JOB_STATE_CANCELLED":
- log.Info(ctx, "Job cancelled")
+ terminal, msg, err := currentStateMessage(j.CurrentState, jobID)
+ if err != nil {
+ return err
+ }
+ log.Infof(ctx, msg)
+ if terminal {
return nil
-
- case "JOB_STATE_FAILED":
- return errors.Errorf("job %s failed", jobID)
-
- case "JOB_STATE_RUNNING":
- log.Info(ctx, "Job still running ...")
-
- default:
- log.Infof(ctx, "Job state: %v ...", j.CurrentState)
}
time.Sleep(30 * time.Second)
}
}
+// currentStateMessage indicates if the state is terminal, and provides a message to log, or an error.
+// Errors are always terminal.
+func currentStateMessage(currentState, jobID string) (bool, string, error) {
+ switch currentState {
+ // Add all Terminal Success stats here.
+ case "JOB_STATE_DONE", "JOB_STATE_CANCELLED", "JOB_STATE_DRAINED", "JOB_STATE_UPDATED":
+ var state string
+ switch currentState {
+ case "JOB_STATE_DONE":
+ state = "succeeded!"
+ case "JOB_STATE_CANCELLED":
+ state = "cancelled"
+ case "JOB_STATE_DRAINED":
+ state = "drained"
+ case "JOB_STATE_UPDATED":
+ state = "updated"
+ }
+ return true, fmt.Sprintf("Job %v %v", jobID, state), nil
+ case "JOB_STATE_FAILED":
+ return true, "", errors.Errorf("Job %s failed", jobID)
+ case "JOB_STATE_RUNNING":
+ return false, "Job still running ...", nil
+ default:
+ return false, fmt.Sprintf("Job state: %v ...", currentState), nil
+ }
+}
+
// NewClient creates a new dataflow client with default application credentials
// and CloudPlatformScope. The Dataflow endpoint is optionally overridden.
func NewClient(ctx context.Context, endpoint string) (*df.Service, error) {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
index 1bf178f2700..1a366e7bebf 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
@@ -17,6 +17,7 @@ package dataflowlib
import (
"context"
+ "fmt"
"reflect"
"testing"
)
@@ -157,3 +158,39 @@ func TestValidateWorkerSettings(t *testing.T) {
})
}
}
+
+func TestCurrentStateMessage(t *testing.T) {
+ tests := []struct {
+ state string
+ term bool
+ want string
+ wantErr error
+ }{
+ {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded!", term: true},
+ {state: "JOB_STATE_DRAINED", want: "Job JorbID-09876 drained", term: true},
+ {state: "JOB_STATE_UPDATED", want: "Job JorbID-09876 updated", term: true},
+ {state: "JOB_STATE_CANCELLED", want: "Job JorbID-09876 cancelled", term: true},
+ {state: "JOB_STATE_RUNNING", want: "Job still running ...", term: false},
+ {state: "JOB_STATE_FAILED", wantErr: fmt.Errorf("Job JorbID-09876 failed"), term: true},
+ {state: "Ossiphrage", want: "Job state: Ossiphrage ...", term: false},
+ }
+ for _, test := range tests {
+ t.Run(test.state, func(t *testing.T) {
+ const jobID = "JorbID-09876"
+ term, got, err := currentStateMessage(test.state, jobID)
+ if term != test.term {
+ termGot, termWant := "false (continues)", "true (terminal)"
+ if !test.term {
+ termGot, termWant = termWant, termGot
+ }
+ t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, termGot, termWant)
+ }
+ if err != nil && err.Error() != test.wantErr.Error() {
+ t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, err, test.wantErr)
+ }
+ if got != test.want {
+ t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, got, test.want)
+ }
+ })
+ }
+}