You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 01:04:08 UTC

[beam] branch prism-jobservices updated (da308835076 -> a0ff073e6da)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git


 discard da308835076 [prism] minimum required job services
 discard 2cb402ad92a [#24789][prism] add preprocessor and test (#25520)
 discard 7bb493edde2 [#24789][prism] internal/worker + tentative data (#25478)
 discard 1294ed9ba73 [Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Read (#25535)
 discard d7e87949649 Fix Tensorflow intergration test model path (#25553)
 discard 9c6eb255411 [prism] add windowing strategy (#25518)
 discard 3e321d15934 Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver (#25540)
 discard 0dd240529d1 Use WindowedValue.withValue on hot paths #21250 (#25519)
 discard 2cef59549dd Replace more uses of `ClassLoadingStrategy.Default.INJECTION` (#23210)
 discard efc1d3629c2 Update Go SDK minimum Go version to 1.19 (#25545)
 discard 03e12fb9d82 prism-fixstatic (#25546)
 discard b24e3850100 Run prbot updates on hosted runners (#25544)
 discard 0dc99edc01c Stop use of self hosted runners for some workflows. (#25542)
     add 20c8d3a2ba1 Merge pull request #25527: [CdapIO][SparkReceiverIO] Complete examples links in READMEs and CdapIO website page fixed
     add 6de67443528 Stop use of self hosted runners for some workflows. (#25542)
     add 29856ae9471 Run prbot updates on hosted runners (#25544)
     add f358fcdc8c8 prism-fixstatic (#25546)
     add 86920261ca7 Update Go SDK minimum Go version to 1.19 (#25545)
     add 2b6291250f9 Replace more uses of `ClassLoadingStrategy.Default.INJECTION` (#23210)
     add 907533220b8 Use WindowedValue.withValue on hot paths #21250 (#25519)
     add de7eb2d83b0 Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver (#25540)
     add 977e53104f4 [prism] add windowing strategy (#25518)
     add c160a081923 Fix Tensorflow intergration test model path (#25553)
     add d652d054ecd [Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Read (#25535)
     add ca1ec257c4a [#24789][prism] internal/worker + tentative data (#25478)
     add 6667eb4741b [#24789][prism] add preprocessor and test (#25520)
     new a0ff073e6da [prism] minimum required job services

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (da308835076)
            \
             N -- N -- N   refs/heads/prism-jobservices (a0ff073e6da)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[beam] 01/01: [prism] minimum required job services

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a0ff073e6dac9028e8ce2f713d0bc58b0b12dec4
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Sun Feb 19 06:31:56 2023 -0800

    [prism] minimum required job services
---
 .../runners/prism/internal/jobservices/artifact.go |  81 ++++++++++++
 .../beam/runners/prism/internal/jobservices/job.go | 120 +++++++++++++++++
 .../prism/internal/jobservices/management.go       | 142 +++++++++++++++++++++
 .../runners/prism/internal/jobservices/metrics.go  |   2 -
 .../runners/prism/internal/jobservices/server.go   |  83 ++++++++++++
 .../prism/internal/jobservices/server_test.go      |  79 ++++++++++++
 6 files changed, 505 insertions(+), 2 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
new file mode 100644
index 00000000000..7ed88e5475e
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/artifact.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"fmt"
+	"io"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	"golang.org/x/exp/slog"
+)
+
+func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingService_ReverseArtifactRetrievalServiceServer) error {
+	in, err := stream.Recv()
+	if err == io.EOF {
+		return nil
+	}
+	if err != nil {
+		return err
+	}
+	job := s.jobs[in.GetStagingToken()]
+
+	envs := job.Pipeline.GetComponents().GetEnvironments()
+	for _, env := range envs {
+		for _, dep := range env.GetDependencies() {
+			slog.Debug("GetArtifact start",
+				slog.Group("dep",
+					slog.String("urn", dep.GetTypeUrn()),
+					slog.String("payload", string(dep.GetTypePayload()))))
+			stream.Send(&jobpb.ArtifactRequestWrapper{
+				Request: &jobpb.ArtifactRequestWrapper_GetArtifact{
+					GetArtifact: &jobpb.GetArtifactRequest{
+						Artifact: dep,
+					},
+				},
+			})
+			var count int
+			for {
+				in, err := stream.Recv()
+				if err == io.EOF {
+					return nil
+				}
+				if err != nil {
+					return err
+				}
+				if in.IsLast {
+					slog.Debug("GetArtifact finish",
+						slog.Group("dep",
+							slog.String("urn", dep.GetTypeUrn()),
+							slog.String("payload", string(dep.GetTypePayload()))),
+						slog.Int("bytesReceived", count))
+					break
+				}
+				// Here's where we go through each environment's artifacts.
+				// We do nothing with them.
+				switch req := in.GetResponse().(type) {
+				case *jobpb.ArtifactResponseWrapper_GetArtifactResponse:
+					count += len(req.GetArtifactResponse.GetData())
+				case *jobpb.ArtifactResponseWrapper_ResolveArtifactResponse:
+					err := fmt.Errorf("Unexpected ResolveArtifactResponse to GetArtifact: %v", in.GetResponse())
+					slog.Error("GetArtifact failure", err)
+					return err
+				}
+			}
+		}
+	}
+	return nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
new file mode 100644
index 00000000000..95b1ce12af9
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jobservices handles services necessary WRT handling jobs from
+// SDKs. Nominally this is the entry point for most users, and a job's
+// external interactions outside of pipeline execution.
+//
+// This includes handling receiving, staging, and provisioning artifacts,
+// and orchestrating external workers, such as for loopback mode.
+//
+// Execution of jobs is abstracted away to an execute function specified
+// at server construction time.
+package jobservices
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"strings"
+	"sync/atomic"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/types/known/structpb"
+)
+
+var capabilities = map[string]struct{}{
+	urns.RequirementSplittableDoFn: {},
+}
+
+// TODO, move back to main package, and key off of executor handlers?
+// Accept whole pipeline instead, and look at every PTransform too.
+func isSupported(requirements []string) error {
+	var unsupported []string
+	for _, req := range requirements {
+		if _, ok := capabilities[req]; !ok {
+			unsupported = append(unsupported, req)
+		}
+	}
+	if len(unsupported) > 0 {
+		sort.Strings(unsupported)
+		return fmt.Errorf("local runner doesn't support the following required features: %v", strings.Join(unsupported, ","))
+	}
+	return nil
+}
+
+// Job is an interface to the job services for executing pipelines.
+// It allows the executor to communicate status, messages, and metrics
+// back to callers of the Job Management API.
+type Job struct {
+	key     string
+	jobName string
+
+	Pipeline *pipepb.Pipeline
+	options  *structpb.Struct
+
+	// Management side concerns.
+	msgChan   chan string
+	state     atomic.Value // jobpb.JobState_Enum
+	stateChan chan jobpb.JobState_Enum
+
+	// Context used to terminate this job.
+	RootCtx  context.Context
+	CancelFn context.CancelFunc
+
+	metrics metricsStore
+}
+
+func (j *Job) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) {
+	j.metrics.ContributeMetrics(payloads)
+}
+
+func (j *Job) String() string {
+	return fmt.Sprintf("%v[%v]", j.key, j.jobName)
+}
+
+func (j *Job) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("key", j.key),
+		slog.String("name", j.jobName))
+}
+
+func (j *Job) SendMsg(msg string) {
+	j.msgChan <- msg
+}
+
+// Start indicates that the job is preparing to execute.
+func (j *Job) Start() {
+	j.stateChan <- jobpb.JobState_STARTING
+}
+
+// Running indicates that the job is executing.
+func (j *Job) Running() {
+	j.stateChan <- jobpb.JobState_RUNNING
+}
+
+// Done indicates that the job completed successfully.
+func (j *Job) Done() {
+	j.stateChan <- jobpb.JobState_DONE
+}
+
+// Failed indicates that the job completed unsuccessfully.
+func (j *Job) Failed() {
+	j.stateChan <- jobpb.JobState_FAILED
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
new file mode 100644
index 00000000000..23150d36a9b
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"context"
+	"fmt"
+	"sync/atomic"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"golang.org/x/exp/slog"
+)
+
+func (s *Server) nextId() string {
+	v := atomic.AddUint32(&s.index, 1)
+	return fmt.Sprintf("job-%03d", v)
+}
+
+func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jobpb.PrepareJobResponse, error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Since jobs execute in the background, they should not be tied to a request's context.
+	rootCtx, cancelFn := context.WithCancel(context.Background())
+	job := &Job{
+		key:      s.nextId(),
+		Pipeline: req.GetPipeline(),
+		jobName:  req.GetJobName(),
+		options:  req.GetPipelineOptions(),
+
+		msgChan:   make(chan string, 100),
+		stateChan: make(chan jobpb.JobState_Enum, 1),
+		RootCtx:   rootCtx,
+		CancelFn:  cancelFn,
+	}
+
+	// Queue initial state of the job.
+	job.state.Store(jobpb.JobState_STOPPED)
+	job.stateChan <- job.state.Load().(jobpb.JobState_Enum)
+
+	if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
+		slog.Error("unable to run job", err, slog.String("jobname", req.GetJobName()))
+		return nil, err
+	}
+	s.jobs[job.key] = job
+	return &jobpb.PrepareJobResponse{
+		PreparationId:       job.key,
+		StagingSessionToken: job.key,
+		ArtifactStagingEndpoint: &pipepb.ApiServiceDescriptor{
+			Url: s.Endpoint(),
+		},
+	}, nil
+}
+
+func (s *Server) Run(ctx context.Context, req *jobpb.RunJobRequest) (*jobpb.RunJobResponse, error) {
+	s.mu.Lock()
+	job := s.jobs[req.GetPreparationId()]
+	s.mu.Unlock()
+
+	// Bring up a background goroutine to allow the job to continue processing.
+	go s.execute(job)
+
+	return &jobpb.RunJobResponse{
+		JobId: job.key,
+	}, nil
+}
+
+// Subscribe to a stream of state changes and messages from the job
+func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.JobService_GetMessageStreamServer) error {
+	s.mu.Lock()
+	job := s.jobs[req.GetJobId()]
+	s.mu.Unlock()
+
+	for {
+		select {
+		case msg := <-job.msgChan:
+			stream.Send(&jobpb.JobMessagesResponse{
+				Response: &jobpb.JobMessagesResponse_MessageResponse{
+					MessageResponse: &jobpb.JobMessage{
+						MessageText: msg,
+						Importance:  jobpb.JobMessage_JOB_MESSAGE_BASIC,
+					},
+				},
+			})
+
+		case state, ok := <-job.stateChan:
+			// TODO: Don't block job execution if WaitForCompletion isn't being run.
+			// The state channel means the job may only execute if something is observing
+			// the message stream, as the send on the state or message channel may block
+			// once full.
+			// Not a problem for tests or short lived batch, but would be hazardous for
+			// asynchronous jobs.
+
+			// Channel is closed, so the job must be done.
+			if !ok {
+				state = jobpb.JobState_DONE
+			}
+			job.state.Store(state)
+			stream.Send(&jobpb.JobMessagesResponse{
+				Response: &jobpb.JobMessagesResponse_StateResponse{
+					StateResponse: &jobpb.JobStateEvent{
+						State: state,
+					},
+				},
+			})
+			switch state {
+			case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_FAILED, jobpb.JobState_UPDATED:
+				// Reached terminal state.
+				return nil
+			}
+		}
+	}
+
+}
+
+// GetJobMetrics Fetch metrics for a given job.
+func (s *Server) GetJobMetrics(ctx context.Context, req *jobpb.GetJobMetricsRequest) (*jobpb.GetJobMetricsResponse, error) {
+	j := s.getJob(req.GetJobId())
+	if j == nil {
+		return nil, fmt.Errorf("GetJobMetrics: unknown jobID: %v", req.GetJobId())
+	}
+	return &jobpb.GetJobMetricsResponse{
+		Metrics: &jobpb.MetricResults{
+			Attempted: j.metrics.Results(tentative),
+			Committed: j.metrics.Results(committed),
+		},
+	}, nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index 1dc0723e3af..39936bae72f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -330,8 +330,6 @@ func (m *distributionInt64) accumulate(pyld []byte) error {
 		Min:   ordMin(m.dist.Min, dist.Min),
 		Max:   ordMax(m.dist.Max, dist.Max),
 	}
-	fmt.Println("dist", dist)
-	fmt.Println("m.dist", dist)
 	return nil
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
new file mode 100644
index 00000000000..41df57d6eb8
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"fmt"
+	"net"
+	"sync"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	"golang.org/x/exp/slog"
+
+	"google.golang.org/grpc"
+)
+
+type Server struct {
+	jobpb.UnimplementedJobServiceServer
+	jobpb.UnimplementedArtifactStagingServiceServer
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// Job Management
+	mu    sync.Mutex
+	index uint32
+	jobs  map[string]*Job
+
+	// execute defines how a job is executed.
+	execute func(*Job)
+}
+
+// NewServer acquires the indicated port.
+func NewServer(port int, execute func(*Job)) *Server {
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	s := &Server{
+		lis:     lis,
+		jobs:    make(map[string]*Job),
+		execute: execute,
+	}
+	slog.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint()))
+	var opts []grpc.ServerOption
+	s.server = grpc.NewServer(opts...)
+	jobpb.RegisterJobServiceServer(s.server, s)
+	jobpb.RegisterArtifactStagingServiceServer(s.server, s)
+	return s
+}
+
+func (s *Server) getJob(id string) *Job {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.jobs[id]
+}
+
+func (s *Server) Endpoint() string {
+	return s.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (s *Server) Serve() {
+	s.server.Serve(s.lis)
+}
+
+// Stop the GRPC server.
+func (s *Server) Stop() {
+	s.server.GracefulStop()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
new file mode 100644
index 00000000000..2223f030ce1
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jobservices
+
+import (
+	"context"
+	"sync"
+	"testing"
+
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// TestServer_Lifecycle validates that a server can start and stop.
+func TestServer_Lifecycle(t *testing.T) {
+	undertest := NewServer(0, func(j *Job) {
+		t.Fatalf("unexpected call to execute: %v", j)
+	})
+
+	go undertest.Serve()
+
+	undertest.Stop()
+}
+
+// Validates that a job can start and stop.
+func TestServer_JobLifecycle(t *testing.T) {
+	var called sync.WaitGroup
+	called.Add(1)
+	undertest := NewServer(0, func(j *Job) {
+		called.Done()
+	})
+	ctx := context.Background()
+
+	wantPipeline := &pipepb.Pipeline{
+		Requirements: []string{urns.RequirementSplittableDoFn},
+	}
+	wantName := "testJob"
+
+	resp, err := undertest.Prepare(ctx, &jobpb.PrepareJobRequest{
+		Pipeline: wantPipeline,
+		JobName:  wantName,
+	})
+	if err != nil {
+		t.Fatalf("server.Prepare() = %v, want nil", err)
+	}
+
+	if got := resp.GetPreparationId(); got == "" {
+		t.Fatalf("server.Prepare() = returned empty preparation ID, want non-empty: %v", prototext.Format(resp))
+	}
+
+	runResp, err := undertest.Run(ctx, &jobpb.RunJobRequest{
+		PreparationId: resp.GetPreparationId(),
+	})
+	if err != nil {
+		t.Fatalf("server.Run() = %v, want nil", err)
+	}
+	if got := runResp.GetJobId(); got == "" {
+		t.Fatalf("server.Run() = returned empty preparation ID, want non-empty")
+	}
+	// If execute is never called, this doesn't unblock and timesout.
+	called.Wait()
+	t.Log("success!")
+	// Nothing to cleanup because we didn't start the server.
+}