You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 00:57:35 UTC
[beam] 13/13: [prism] minimum required job services
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git
commit da30883507668e4cfb1f378186a47aa5dce201be
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Sun Feb 19 06:25:05 2023 -0800
[prism] minimum required job services
---
.../runners/prism/internal/jobservices/artifact.go | 81 ++++++++++++
.../beam/runners/prism/internal/jobservices/job.go | 120 +++++++++++++++++
.../prism/internal/jobservices/management.go | 142 +++++++++++++++++++++
.../runners/prism/internal/jobservices/metrics.go | 2 -
.../runners/prism/internal/jobservices/server.go | 83 ++++++++++++
.../prism/internal/jobservices/server_test.go | 79 ++++++++++++
6 files changed, 505 insertions(+), 2 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
new file mode 100644
index 00000000000..7ed88e5475e
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+ "fmt"
+ "io"
+
+ jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+ "golang.org/x/exp/slog"
+)
+
+func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingService_ReverseArtifactRetrievalServiceServer) error {
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ job := s.jobs[in.GetStagingToken()]
+
+ envs := job.Pipeline.GetComponents().GetEnvironments()
+ for _, env := range envs {
+ for _, dep := range env.GetDependencies() {
+ slog.Debug("GetArtifact start",
+ slog.Group("dep",
+ slog.String("urn", dep.GetTypeUrn()),
+ slog.String("payload", string(dep.GetTypePayload()))))
+ stream.Send(&jobpb.ArtifactRequestWrapper{
+ Request: &jobpb.ArtifactRequestWrapper_GetArtifact{
+ GetArtifact: &jobpb.GetArtifactRequest{
+ Artifact: dep,
+ },
+ },
+ })
+ var count int
+ for {
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ if in.IsLast {
+ slog.Debug("GetArtifact finish",
+ slog.Group("dep",
+ slog.String("urn", dep.GetTypeUrn()),
+ slog.String("payload", string(dep.GetTypePayload()))),
+ slog.Int("bytesReceived", count))
+ break
+ }
+ // Here's where we go through each environment's artifacts.
+ // We do nothing with them.
+ switch req := in.GetResponse().(type) {
+ case *jobpb.ArtifactResponseWrapper_GetArtifactResponse:
+ count += len(req.GetArtifactResponse.GetData())
+ case *jobpb.ArtifactResponseWrapper_ResolveArtifactResponse:
+ err := fmt.Errorf("Unexpected ResolveArtifactResponse to GetArtifact: %v", in.GetResponse())
+ slog.Error("GetArtifact failure", err)
+ return err
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
new file mode 100644
index 00000000000..95b1ce12af9
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jobservices handles services necessary WRT handling jobs from
+// SDKs. Nominally this is the entry point for most users, and a job's
+// external interactions outside of pipeline execution.
+//
+// This includes handling receiving, staging, and provisioning artifacts,
+// and orchestrating external workers, such as for loopback mode.
+//
+// Execution of jobs is abstracted away to an execute function specified
+// at server construction time.
+package jobservices
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "strings"
+ "sync/atomic"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+ "golang.org/x/exp/slog"
+ "google.golang.org/protobuf/types/known/structpb"
+)
+
+var capabilities = map[string]struct{}{
+ urns.RequirementSplittableDoFn: {},
+}
+
+// TODO, move back to main package, and key off of executor handlers?
+// Accept whole pipeline instead, and look at every PTransform too.
+func isSupported(requirements []string) error {
+ var unsupported []string
+ for _, req := range requirements {
+ if _, ok := capabilities[req]; !ok {
+ unsupported = append(unsupported, req)
+ }
+ }
+ if len(unsupported) > 0 {
+ sort.Strings(unsupported)
+ return fmt.Errorf("local runner doesn't support the following required features: %v", strings.Join(unsupported, ","))
+ }
+ return nil
+}
+
+// Job is an interface to the job services for executing pipelines.
+// It allows the executor to communicate status, messages, and metrics
+// back to callers of the Job Management API.
+type Job struct {
+ key string
+ jobName string
+
+ Pipeline *pipepb.Pipeline
+ options *structpb.Struct
+
+ // Management side concerns.
+ msgChan chan string
+ state atomic.Value // jobpb.JobState_Enum
+ stateChan chan jobpb.JobState_Enum
+
+ // Context used to terminate this job.
+ RootCtx context.Context
+ CancelFn context.CancelFunc
+
+ metrics metricsStore
+}
+
+func (j *Job) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) {
+ j.metrics.ContributeMetrics(payloads)
+}
+
+func (j *Job) String() string {
+ return fmt.Sprintf("%v[%v]", j.key, j.jobName)
+}
+
+func (j *Job) LogValue() slog.Value {
+ return slog.GroupValue(
+ slog.String("key", j.key),
+ slog.String("name", j.jobName))
+}
+
+func (j *Job) SendMsg(msg string) {
+ j.msgChan <- msg
+}
+
+// Start indicates that the job is preparing to execute.
+func (j *Job) Start() {
+ j.stateChan <- jobpb.JobState_STARTING
+}
+
+// Running indicates that the job is executing.
+func (j *Job) Running() {
+ j.stateChan <- jobpb.JobState_RUNNING
+}
+
+// Done indicates that the job completed successfully.
+func (j *Job) Done() {
+ j.stateChan <- jobpb.JobState_DONE
+}
+
+// Failed indicates that the job completed unsuccessfully.
+func (j *Job) Failed() {
+ j.stateChan <- jobpb.JobState_FAILED
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
new file mode 100644
index 00000000000..23150d36a9b
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+ "context"
+ "fmt"
+ "sync/atomic"
+
+ jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "golang.org/x/exp/slog"
+)
+
+func (s *Server) nextId() string {
+ v := atomic.AddUint32(&s.index, 1)
+ return fmt.Sprintf("job-%03d", v)
+}
+
+func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jobpb.PrepareJobResponse, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Since jobs execute in the background, they should not be tied to a request's context.
+ rootCtx, cancelFn := context.WithCancel(context.Background())
+ job := &Job{
+ key: s.nextId(),
+ Pipeline: req.GetPipeline(),
+ jobName: req.GetJobName(),
+ options: req.GetPipelineOptions(),
+
+ msgChan: make(chan string, 100),
+ stateChan: make(chan jobpb.JobState_Enum, 1),
+ RootCtx: rootCtx,
+ CancelFn: cancelFn,
+ }
+
+ // Queue initial state of the job.
+ job.state.Store(jobpb.JobState_STOPPED)
+ job.stateChan <- job.state.Load().(jobpb.JobState_Enum)
+
+ if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
+ slog.Error("unable to run job", err, slog.String("jobname", req.GetJobName()))
+ return nil, err
+ }
+ s.jobs[job.key] = job
+ return &jobpb.PrepareJobResponse{
+ PreparationId: job.key,
+ StagingSessionToken: job.key,
+ ArtifactStagingEndpoint: &pipepb.ApiServiceDescriptor{
+ Url: s.Endpoint(),
+ },
+ }, nil
+}
+
+func (s *Server) Run(ctx context.Context, req *jobpb.RunJobRequest) (*jobpb.RunJobResponse, error) {
+ s.mu.Lock()
+ job := s.jobs[req.GetPreparationId()]
+ s.mu.Unlock()
+
+ // Bring up a background goroutine to allow the job to continue processing.
+ go s.execute(job)
+
+ return &jobpb.RunJobResponse{
+ JobId: job.key,
+ }, nil
+}
+
+// Subscribe to a stream of state changes and messages from the job
+func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.JobService_GetMessageStreamServer) error {
+ s.mu.Lock()
+ job := s.jobs[req.GetJobId()]
+ s.mu.Unlock()
+
+ for {
+ select {
+ case msg := <-job.msgChan:
+ stream.Send(&jobpb.JobMessagesResponse{
+ Response: &jobpb.JobMessagesResponse_MessageResponse{
+ MessageResponse: &jobpb.JobMessage{
+ MessageText: msg,
+ Importance: jobpb.JobMessage_JOB_MESSAGE_BASIC,
+ },
+ },
+ })
+
+ case state, ok := <-job.stateChan:
+ // TODO: Don't block job execution if WaitForCompletion isn't being run.
+ // The state channel means the job may only execute if something is observing
+ // the message stream, as the send on the state or message channel may block
+ // once full.
+ // Not a problem for tests or short lived batch, but would be hazardous for
+ // asynchronous jobs.
+
+ // Channel is closed, so the job must be done.
+ if !ok {
+ state = jobpb.JobState_DONE
+ }
+ job.state.Store(state)
+ stream.Send(&jobpb.JobMessagesResponse{
+ Response: &jobpb.JobMessagesResponse_StateResponse{
+ StateResponse: &jobpb.JobStateEvent{
+ State: state,
+ },
+ },
+ })
+ switch state {
+ case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_FAILED, jobpb.JobState_UPDATED:
+ // Reached terminal state.
+ return nil
+ }
+ }
+ }
+
+}
+
+// GetJobMetrics Fetch metrics for a given job.
+func (s *Server) GetJobMetrics(ctx context.Context, req *jobpb.GetJobMetricsRequest) (*jobpb.GetJobMetricsResponse, error) {
+ j := s.getJob(req.GetJobId())
+ if j == nil {
+ return nil, fmt.Errorf("GetJobMetrics: unknown jobID: %v", req.GetJobId())
+ }
+ return &jobpb.GetJobMetricsResponse{
+ Metrics: &jobpb.MetricResults{
+ Attempted: j.metrics.Results(tentative),
+ Committed: j.metrics.Results(committed),
+ },
+ }, nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index 1dc0723e3af..39936bae72f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -330,8 +330,6 @@ func (m *distributionInt64) accumulate(pyld []byte) error {
Min: ordMin(m.dist.Min, dist.Min),
Max: ordMax(m.dist.Max, dist.Max),
}
- fmt.Println("dist", dist)
- fmt.Println("m.dist", dist)
return nil
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
new file mode 100644
index 00000000000..41df57d6eb8
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+ "fmt"
+ "net"
+ "sync"
+
+ jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+ "golang.org/x/exp/slog"
+
+ "google.golang.org/grpc"
+)
+
+type Server struct {
+ jobpb.UnimplementedJobServiceServer
+ jobpb.UnimplementedArtifactStagingServiceServer
+
+ // Server management
+ lis net.Listener
+ server *grpc.Server
+
+ // Job Management
+ mu sync.Mutex
+ index uint32
+ jobs map[string]*Job
+
+ // execute defines how a job is executed.
+ execute func(*Job)
+}
+
+// NewServer acquires the indicated port.
+func NewServer(port int, execute func(*Job)) *Server {
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
+ if err != nil {
+ panic(fmt.Sprintf("failed to listen: %v", err))
+ }
+ s := &Server{
+ lis: lis,
+ jobs: make(map[string]*Job),
+ execute: execute,
+ }
+ slog.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint()))
+ var opts []grpc.ServerOption
+ s.server = grpc.NewServer(opts...)
+ jobpb.RegisterJobServiceServer(s.server, s)
+ jobpb.RegisterArtifactStagingServiceServer(s.server, s)
+ return s
+}
+
+func (s *Server) getJob(id string) *Job {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.jobs[id]
+}
+
+func (s *Server) Endpoint() string {
+ return s.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (s *Server) Serve() {
+ s.server.Serve(s.lis)
+}
+
+// Stop the GRPC server.
+func (s *Server) Stop() {
+ s.server.GracefulStop()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
new file mode 100644
index 00000000000..2223f030ce1
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+ "context"
+ "sync"
+ "testing"
+
+ jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+ "google.golang.org/protobuf/encoding/prototext"
+)
+
+// TestServer_Lifecycle validates that a server can start and stop.
+func TestServer_Lifecycle(t *testing.T) {
+ undertest := NewServer(0, func(j *Job) {
+ t.Fatalf("unexpected call to execute: %v", j)
+ })
+
+ go undertest.Serve()
+
+ undertest.Stop()
+}
+
+// Validates that a job can start and stop.
+func TestServer_JobLifecycle(t *testing.T) {
+ var called sync.WaitGroup
+ called.Add(1)
+ undertest := NewServer(0, func(j *Job) {
+ called.Done()
+ })
+ ctx := context.Background()
+
+ wantPipeline := &pipepb.Pipeline{
+ Requirements: []string{urns.RequirementSplittableDoFn},
+ }
+ wantName := "testJob"
+
+ resp, err := undertest.Prepare(ctx, &jobpb.PrepareJobRequest{
+ Pipeline: wantPipeline,
+ JobName: wantName,
+ })
+ if err != nil {
+ t.Fatalf("server.Prepare() = %v, want nil", err)
+ }
+
+ if got := resp.GetPreparationId(); got == "" {
+ t.Fatalf("server.Prepare() = returned empty preparation ID, want non-empty: %v", prototext.Format(resp))
+ }
+
+ runResp, err := undertest.Run(ctx, &jobpb.RunJobRequest{
+ PreparationId: resp.GetPreparationId(),
+ })
+ if err != nil {
+ t.Fatalf("server.Run() = %v, want nil", err)
+ }
+ if got := runResp.GetJobId(); got == "" {
+ t.Fatalf("server.Run() = returned empty preparation ID, want non-empty")
+ }
+ // If execute is never called, this doesn't unblock and timesout.
+ called.Wait()
+ t.Log("success!")
+ // Nothing to cleanup because we didn't start the server.
+}