You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/06/28 15:35:17 UTC

[beam] 01/02: Maintain job on requirement failure.

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch includeFailures
in repository https://gitbox.apache.org/repos/asf/beam.git

commit be8a7b54dbbcca5e2fc1e33180c1d66beaca8718
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Wed Jun 28 16:56:54 2023 +0200

    Maintain job on requirement failure.
---
 .../pkg/beam/runners/prism/internal/jobservices/job.go  |  7 ++++++-
 .../runners/prism/internal/jobservices/management.go    | 17 ++++++++++-------
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 4ac37c5db59..ea7b09c8441 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -31,6 +31,7 @@ import (
 	"strings"
 	"sync"
 	"sync/atomic"
+	"time"
 
 	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
 	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
@@ -77,6 +78,8 @@ type Job struct {
 	msgs           []string
 	stateIdx       int
 	state          atomic.Value // jobpb.JobState_Enum
+	stateTime      time.Time
+	failureErr     error
 
 	// Context used to terminate this job.
 	RootCtx  context.Context
@@ -134,6 +137,7 @@ func (j *Job) SendMsg(msg string) {
 func (j *Job) sendState(state jobpb.JobState_Enum) {
 	j.streamCond.L.Lock()
 	defer j.streamCond.L.Unlock()
+	j.stateTime = time.Now()
 	j.stateIdx++
 	j.state.Store(state)
 	j.streamCond.Broadcast()
@@ -155,6 +159,7 @@ func (j *Job) Done() {
 }
 
 // Failed indicates that the job completed unsuccessfully.
-func (j *Job) Failed() {
+func (j *Job) Failed(err error) {
 	j.sendState(jobpb.JobState_FAILED)
+	j.failureErr = err
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 2b077c3a661..f65d2eb070f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -25,6 +25,7 @@ import (
 	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
 	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/types/known/timestamppb"
 )
 
 func (s *Server) nextId() string {
@@ -81,8 +82,10 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
 
 	// Queue initial state of the job.
 	job.state.Store(jobpb.JobState_STOPPED)
+	s.jobs[job.key] = job
 
 	if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
+		job.Failed(err)
 		slog.Error("unable to run job", slog.String("error", err.Error()), slog.String("jobname", req.GetJobName()))
 		return nil, err
 	}
@@ -136,11 +139,12 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
 		}
 	}
 	if len(errs) > 0 {
-		err := &joinError{errs: errs}
-		slog.Error("unable to run job", slog.String("cause", "unimplemented features"), slog.String("jobname", req.GetJobName()), slog.String("errors", err.Error()))
-		return nil, fmt.Errorf("found %v uses of features unimplemented in prism in job %v: %v", len(errs), req.GetJobName(), err)
+		jErr := &joinError{errs: errs}
+		slog.Error("unable to run job", slog.String("cause", "unimplemented features"), slog.String("jobname", req.GetJobName()), slog.String("errors", jErr.Error()))
+		err := fmt.Errorf("found %v uses of features unimplemented in prism in job %v: %v", len(errs), req.GetJobName(), jErr)
+		job.Failed(err)
+		return nil, err
 	}
-	s.jobs[job.key] = job
 	return &jobpb.PrepareJobResponse{
 		PreparationId:       job.key,
 		StagingSessionToken: job.key,
@@ -178,8 +182,6 @@ func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.Jo
 	curMsg := job.minMsg
 	curState := job.stateIdx
 
-	stream.Context()
-
 	state := job.state.Load().(jobpb.JobState_Enum)
 	for {
 		for (curMsg >= job.maxMsg || len(job.msgs) == 0) && curState > job.stateIdx {
@@ -283,6 +285,7 @@ func (s *Server) GetState(_ context.Context, req *jobpb.GetJobStateRequest) (*jo
 		return nil, fmt.Errorf("job with id %v not found", req.GetJobId())
 	}
 	return &jobpb.JobStateEvent{
-		State: j.state.Load().(jobpb.JobState_Enum),
+		State:     j.state.Load().(jobpb.JobState_Enum),
+		Timestamp: timestamppb.New(j.stateTime),
 	}, nil
 }