You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 01:04:09 UTC

[beam] 01/01: [prism] minimum required job services

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a0ff073e6dac9028e8ce2f713d0bc58b0b12dec4
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Sun Feb 19 06:31:56 2023 -0800

    [prism] minimum required job services
---
 .../runners/prism/internal/jobservices/artifact.go |  81 ++++++++++++
 .../beam/runners/prism/internal/jobservices/job.go | 120 +++++++++++++++++
 .../prism/internal/jobservices/management.go       | 142 +++++++++++++++++++++
 .../runners/prism/internal/jobservices/metrics.go  |   2 -
 .../runners/prism/internal/jobservices/server.go   |  83 ++++++++++++
 .../prism/internal/jobservices/server_test.go      |  79 ++++++++++++
 6 files changed, 505 insertions(+), 2 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
new file mode 100644
index 00000000000..7ed88e5475e
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"fmt"
+	"io"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	"golang.org/x/exp/slog"
+)
+
+func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingService_ReverseArtifactRetrievalServiceServer) error {
+	in, err := stream.Recv()
+	if err == io.EOF {
+		return nil
+	}
+	if err != nil {
+		return err
+	}
+	job := s.jobs[in.GetStagingToken()]
+
+	envs := job.Pipeline.GetComponents().GetEnvironments()
+	for _, env := range envs {
+		for _, dep := range env.GetDependencies() {
+			slog.Debug("GetArtifact start",
+				slog.Group("dep",
+					slog.String("urn", dep.GetTypeUrn()),
+					slog.String("payload", string(dep.GetTypePayload()))))
+			stream.Send(&jobpb.ArtifactRequestWrapper{
+				Request: &jobpb.ArtifactRequestWrapper_GetArtifact{
+					GetArtifact: &jobpb.GetArtifactRequest{
+						Artifact: dep,
+					},
+				},
+			})
+			var count int
+			for {
+				in, err := stream.Recv()
+				if err == io.EOF {
+					return nil
+				}
+				if err != nil {
+					return err
+				}
+				if in.IsLast {
+					slog.Debug("GetArtifact finish",
+						slog.Group("dep",
+							slog.String("urn", dep.GetTypeUrn()),
+							slog.String("payload", string(dep.GetTypePayload()))),
+						slog.Int("bytesReceived", count))
+					break
+				}
+				// Here's where we go through each environment's artifacts.
+				// We do nothing with them.
+				switch req := in.GetResponse().(type) {
+				case *jobpb.ArtifactResponseWrapper_GetArtifactResponse:
+					count += len(req.GetArtifactResponse.GetData())
+				case *jobpb.ArtifactResponseWrapper_ResolveArtifactResponse:
+					err := fmt.Errorf("Unexpected ResolveArtifactResponse to GetArtifact: %v", in.GetResponse())
+					slog.Error("GetArtifact failure", err)
+					return err
+				}
+			}
+		}
+	}
+	return nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
new file mode 100644
index 00000000000..95b1ce12af9
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jobservices handles services necessary WRT handling jobs from
+// SDKs. Nominally this is the entry point for most users, and a job's
+// external interactions outside of pipeline execution.
+//
+// This includes handling receiving, staging, and provisioning artifacts,
+// and orchestrating external workers, such as for loopback mode.
+//
+// Execution of jobs is abstracted away to an execute function specified
+// at server construction time.
+package jobservices
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"strings"
+	"sync/atomic"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/types/known/structpb"
+)
+
+var capabilities = map[string]struct{}{
+	urns.RequirementSplittableDoFn: {},
+}
+
+// TODO, move back to main package, and key off of executor handlers?
+// Accept whole pipeline instead, and look at every PTransform too.
+func isSupported(requirements []string) error {
+	var unsupported []string
+	for _, req := range requirements {
+		if _, ok := capabilities[req]; !ok {
+			unsupported = append(unsupported, req)
+		}
+	}
+	if len(unsupported) > 0 {
+		sort.Strings(unsupported)
+		return fmt.Errorf("local runner doesn't support the following required features: %v", strings.Join(unsupported, ","))
+	}
+	return nil
+}
+
+// Job is an interface to the job services for executing pipelines.
+// It allows the executor to communicate status, messages, and metrics
+// back to callers of the Job Management API.
+type Job struct {
+	key     string
+	jobName string
+
+	Pipeline *pipepb.Pipeline
+	options  *structpb.Struct
+
+	// Management side concerns.
+	msgChan   chan string
+	state     atomic.Value // jobpb.JobState_Enum
+	stateChan chan jobpb.JobState_Enum
+
+	// Context used to terminate this job.
+	RootCtx  context.Context
+	CancelFn context.CancelFunc
+
+	metrics metricsStore
+}
+
+func (j *Job) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) {
+	j.metrics.ContributeMetrics(payloads)
+}
+
+func (j *Job) String() string {
+	return fmt.Sprintf("%v[%v]", j.key, j.jobName)
+}
+
+func (j *Job) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("key", j.key),
+		slog.String("name", j.jobName))
+}
+
+func (j *Job) SendMsg(msg string) {
+	j.msgChan <- msg
+}
+
+// Start indicates that the job is preparing to execute.
+func (j *Job) Start() {
+	j.stateChan <- jobpb.JobState_STARTING
+}
+
+// Running indicates that the job is executing.
+func (j *Job) Running() {
+	j.stateChan <- jobpb.JobState_RUNNING
+}
+
+// Done indicates that the job completed successfully.
+func (j *Job) Done() {
+	j.stateChan <- jobpb.JobState_DONE
+}
+
+// Failed indicates that the job completed unsuccessfully.
+func (j *Job) Failed() {
+	j.stateChan <- jobpb.JobState_FAILED
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
new file mode 100644
index 00000000000..23150d36a9b
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"context"
+	"fmt"
+	"sync/atomic"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"golang.org/x/exp/slog"
+)
+
+func (s *Server) nextId() string {
+	v := atomic.AddUint32(&s.index, 1)
+	return fmt.Sprintf("job-%03d", v)
+}
+
+func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jobpb.PrepareJobResponse, error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Since jobs execute in the background, they should not be tied to a request's context.
+	rootCtx, cancelFn := context.WithCancel(context.Background())
+	job := &Job{
+		key:      s.nextId(),
+		Pipeline: req.GetPipeline(),
+		jobName:  req.GetJobName(),
+		options:  req.GetPipelineOptions(),
+
+		msgChan:   make(chan string, 100),
+		stateChan: make(chan jobpb.JobState_Enum, 1),
+		RootCtx:   rootCtx,
+		CancelFn:  cancelFn,
+	}
+
+	// Queue initial state of the job.
+	job.state.Store(jobpb.JobState_STOPPED)
+	job.stateChan <- job.state.Load().(jobpb.JobState_Enum)
+
+	if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
+		slog.Error("unable to run job", err, slog.String("jobname", req.GetJobName()))
+		return nil, err
+	}
+	s.jobs[job.key] = job
+	return &jobpb.PrepareJobResponse{
+		PreparationId:       job.key,
+		StagingSessionToken: job.key,
+		ArtifactStagingEndpoint: &pipepb.ApiServiceDescriptor{
+			Url: s.Endpoint(),
+		},
+	}, nil
+}
+
+func (s *Server) Run(ctx context.Context, req *jobpb.RunJobRequest) (*jobpb.RunJobResponse, error) {
+	s.mu.Lock()
+	job := s.jobs[req.GetPreparationId()]
+	s.mu.Unlock()
+
+	// Bring up a background goroutine to allow the job to continue processing.
+	go s.execute(job)
+
+	return &jobpb.RunJobResponse{
+		JobId: job.key,
+	}, nil
+}
+
+// Subscribe to a stream of state changes and messages from the job
+func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.JobService_GetMessageStreamServer) error {
+	s.mu.Lock()
+	job := s.jobs[req.GetJobId()]
+	s.mu.Unlock()
+
+	for {
+		select {
+		case msg := <-job.msgChan:
+			stream.Send(&jobpb.JobMessagesResponse{
+				Response: &jobpb.JobMessagesResponse_MessageResponse{
+					MessageResponse: &jobpb.JobMessage{
+						MessageText: msg,
+						Importance:  jobpb.JobMessage_JOB_MESSAGE_BASIC,
+					},
+				},
+			})
+
+		case state, ok := <-job.stateChan:
+			// TODO: Don't block job execution if WaitForCompletion isn't being run.
+			// The state channel means the job may only execute if something is observing
+			// the message stream, as the send on the state or message channel may block
+			// once full.
+			// Not a problem for tests or short lived batch, but would be hazardous for
+			// asynchronous jobs.
+
+			// Channel is closed, so the job must be done.
+			if !ok {
+				state = jobpb.JobState_DONE
+			}
+			job.state.Store(state)
+			stream.Send(&jobpb.JobMessagesResponse{
+				Response: &jobpb.JobMessagesResponse_StateResponse{
+					StateResponse: &jobpb.JobStateEvent{
+						State: state,
+					},
+				},
+			})
+			switch state {
+			case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_FAILED, jobpb.JobState_UPDATED:
+				// Reached terminal state.
+				return nil
+			}
+		}
+	}
+
+}
+
+// GetJobMetrics Fetch metrics for a given job.
+func (s *Server) GetJobMetrics(ctx context.Context, req *jobpb.GetJobMetricsRequest) (*jobpb.GetJobMetricsResponse, error) {
+	j := s.getJob(req.GetJobId())
+	if j == nil {
+		return nil, fmt.Errorf("GetJobMetrics: unknown jobID: %v", req.GetJobId())
+	}
+	return &jobpb.GetJobMetricsResponse{
+		Metrics: &jobpb.MetricResults{
+			Attempted: j.metrics.Results(tentative),
+			Committed: j.metrics.Results(committed),
+		},
+	}, nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index 1dc0723e3af..39936bae72f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -330,8 +330,6 @@ func (m *distributionInt64) accumulate(pyld []byte) error {
 		Min:   ordMin(m.dist.Min, dist.Min),
 		Max:   ordMax(m.dist.Max, dist.Max),
 	}
-	fmt.Println("dist", dist)
-	fmt.Println("m.dist", dist)
 	return nil
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
new file mode 100644
index 00000000000..41df57d6eb8
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"fmt"
+	"net"
+	"sync"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	"golang.org/x/exp/slog"
+
+	"google.golang.org/grpc"
+)
+
+type Server struct {
+	jobpb.UnimplementedJobServiceServer
+	jobpb.UnimplementedArtifactStagingServiceServer
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// Job Management
+	mu    sync.Mutex
+	index uint32
+	jobs  map[string]*Job
+
+	// execute defines how a job is executed.
+	execute func(*Job)
+}
+
+// NewServer acquires the indicated port.
+func NewServer(port int, execute func(*Job)) *Server {
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	s := &Server{
+		lis:     lis,
+		jobs:    make(map[string]*Job),
+		execute: execute,
+	}
+	slog.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint()))
+	var opts []grpc.ServerOption
+	s.server = grpc.NewServer(opts...)
+	jobpb.RegisterJobServiceServer(s.server, s)
+	jobpb.RegisterArtifactStagingServiceServer(s.server, s)
+	return s
+}
+
+func (s *Server) getJob(id string) *Job {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.jobs[id]
+}
+
+func (s *Server) Endpoint() string {
+	return s.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (s *Server) Serve() {
+	s.server.Serve(s.lis)
+}
+
+// Stop the GRPC server.
+func (s *Server) Stop() {
+	s.server.GracefulStop()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
new file mode 100644
index 00000000000..2223f030ce1
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"context"
+	"sync"
+	"testing"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// TestServer_Lifecycle validates that a server can start and stop.
+func TestServer_Lifecycle(t *testing.T) {
+	undertest := NewServer(0, func(j *Job) {
+		t.Fatalf("unexpected call to execute: %v", j)
+	})
+
+	go undertest.Serve()
+
+	undertest.Stop()
+}
+
+// Validates that a job can start and stop.
+func TestServer_JobLifecycle(t *testing.T) {
+	var called sync.WaitGroup
+	called.Add(1)
+	undertest := NewServer(0, func(j *Job) {
+		called.Done()
+	})
+	ctx := context.Background()
+
+	wantPipeline := &pipepb.Pipeline{
+		Requirements: []string{urns.RequirementSplittableDoFn},
+	}
+	wantName := "testJob"
+
+	resp, err := undertest.Prepare(ctx, &jobpb.PrepareJobRequest{
+		Pipeline: wantPipeline,
+		JobName:  wantName,
+	})
+	if err != nil {
+		t.Fatalf("server.Prepare() = %v, want nil", err)
+	}
+
+	if got := resp.GetPreparationId(); got == "" {
+		t.Fatalf("server.Prepare() = returned empty preparation ID, want non-empty: %v", prototext.Format(resp))
+	}
+
+	runResp, err := undertest.Run(ctx, &jobpb.RunJobRequest{
+		PreparationId: resp.GetPreparationId(),
+	})
+	if err != nil {
+		t.Fatalf("server.Run() = %v, want nil", err)
+	}
+	if got := runResp.GetJobId(); got == "" {
+		t.Fatalf("server.Run() = returned empty preparation ID, want non-empty")
+	}
+	// If execute is never called, this doesn't unblock and timesout.
+	called.Wait()
+	t.Log("success!")
+	// Nothing to cleanup because we didn't start the server.
+}