You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/06/28 15:35:17 UTC
[beam] 01/02: Maintain job on requirement failure.
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch includeFailures
in repository https://gitbox.apache.org/repos/asf/beam.git
commit be8a7b54dbbcca5e2fc1e33180c1d66beaca8718
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Wed Jun 28 16:56:54 2023 +0200
Maintain job on requirement failure.
---
.../pkg/beam/runners/prism/internal/jobservices/job.go | 7 ++++++-
.../runners/prism/internal/jobservices/management.go | 17 ++++++++++-------
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 4ac37c5db59..ea7b09c8441 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -31,6 +31,7 @@ import (
"strings"
"sync"
"sync/atomic"
+ "time"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
@@ -77,6 +78,8 @@ type Job struct {
msgs []string
stateIdx int
state atomic.Value // jobpb.JobState_Enum
+ stateTime time.Time
+ failureErr error
// Context used to terminate this job.
RootCtx context.Context
@@ -134,6 +137,7 @@ func (j *Job) SendMsg(msg string) {
func (j *Job) sendState(state jobpb.JobState_Enum) {
j.streamCond.L.Lock()
defer j.streamCond.L.Unlock()
+ j.stateTime = time.Now()
j.stateIdx++
j.state.Store(state)
j.streamCond.Broadcast()
@@ -155,6 +159,7 @@ func (j *Job) Done() {
}
// Failed indicates that the job completed unsuccessfully.
-func (j *Job) Failed() {
+func (j *Job) Failed(err error) {
j.sendState(jobpb.JobState_FAILED)
+ j.failureErr = err
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 2b077c3a661..f65d2eb070f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -25,6 +25,7 @@ import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"golang.org/x/exp/slog"
+ "google.golang.org/protobuf/types/known/timestamppb"
)
func (s *Server) nextId() string {
@@ -81,8 +82,10 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
// Queue initial state of the job.
job.state.Store(jobpb.JobState_STOPPED)
+ s.jobs[job.key] = job
if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
+ job.Failed(err)
slog.Error("unable to run job", slog.String("error", err.Error()), slog.String("jobname", req.GetJobName()))
return nil, err
}
@@ -136,11 +139,12 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
}
}
if len(errs) > 0 {
- err := &joinError{errs: errs}
- slog.Error("unable to run job", slog.String("cause", "unimplemented features"), slog.String("jobname", req.GetJobName()), slog.String("errors", err.Error()))
- return nil, fmt.Errorf("found %v uses of features unimplemented in prism in job %v: %v", len(errs), req.GetJobName(), err)
+ jErr := &joinError{errs: errs}
+ slog.Error("unable to run job", slog.String("cause", "unimplemented features"), slog.String("jobname", req.GetJobName()), slog.String("errors", jErr.Error()))
+ err := fmt.Errorf("found %v uses of features unimplemented in prism in job %v: %v", len(errs), req.GetJobName(), jErr)
+ job.Failed(err)
+ return nil, err
}
- s.jobs[job.key] = job
return &jobpb.PrepareJobResponse{
PreparationId: job.key,
StagingSessionToken: job.key,
@@ -178,8 +182,6 @@ func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.Jo
curMsg := job.minMsg
curState := job.stateIdx
- stream.Context()
-
state := job.state.Load().(jobpb.JobState_Enum)
for {
for (curMsg >= job.maxMsg || len(job.msgs) == 0) && curState > job.stateIdx {
@@ -283,6 +285,7 @@ func (s *Server) GetState(_ context.Context, req *jobpb.GetJobStateRequest) (*jo
return nil, fmt.Errorf("job with id %v not found", req.GetJobId())
}
return &jobpb.JobStateEvent{
- State: j.state.Load().(jobpb.JobState_Enum),
+ State: j.state.Load().(jobpb.JobState_Enum),
+ Timestamp: timestamppb.New(j.stateTime),
}, nil
}