You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2019/01/23 18:11:49 UTC

[beam] branch master updated: [BEAM-6237] Fix ULR not deleting artifacts after running jobs.

This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e34795  [BEAM-6237] Fix ULR not deleting artifacts after running jobs.
     new da66729  Merge pull request #7571: [BEAM-6237] Fix ULR not deleting artifacts after running jobs.
0e34795 is described below

commit 0e347955685c8d9310ee3ea8efa0f9268425de3d
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Fri Dec 21 16:37:48 2018 -0800

    [BEAM-6237] Fix ULR not deleting artifacts after running jobs.
    
    This change switches the ULR from using
    LocalFileSystemArtifactStagerService to using
    BeamFileSystemArtifactStagingService which has functionality to remove
    artifacts after running a job. With this change ValidatesRunner tests
    no longer leave huge amounts of artifacts when run with the ULR.
---
 .../runners/direct/portable/ReferenceRunner.java   | 39 +++++-----
 .../runners/direct/portable/job/PreparingJob.java  | 11 ++-
 .../portable/job/ReferenceRunnerJobServer.java     | 37 +++++++++-
 .../portable/job/ReferenceRunnerJobService.java    | 85 +++++++++++++---------
 .../job/ReferenceRunnerJobServiceTest.java         | 11 ++-
 5 files changed, 118 insertions(+), 65 deletions(-)

diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 2c2c9f0..4824bcb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -22,14 +22,12 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement;
 
-import java.io.File;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.ProvisionApi.ProvisionInfo;
 import org.apache.beam.model.fnexecution.v1.ProvisionApi.Resources;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -56,13 +54,12 @@ import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
 import org.apache.beam.runners.core.construction.graph.ProtoOverrides.TransformReplacement;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.direct.ExecutableGraph;
-import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactRetrievalService;
-import org.apache.beam.runners.direct.portable.artifact.UnsupportedArtifactRetrievalService;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.InProcessServerFactory;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -90,31 +87,42 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
-/** The "ReferenceRunner" engine implementation. */
+/**
+ * The "ReferenceRunner" engine implementation. The ReferenceRunner uses the portability framework
+ * to execute a Pipeline on a single machine.
+ */
 public class ReferenceRunner {
   private final RunnerApi.Pipeline pipeline;
   private final Struct options;
-  @Nullable private final File artifactsDir;
+  private final String artifactRetrievalToken;
 
   private final EnvironmentType environmentType;
 
   private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
 
+  /** @param environmentType The environment to use for the SDK Harness. */
   private ReferenceRunner(
-      Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType environmentType) {
+      Pipeline p, Struct options, String artifactRetrievalToken, EnvironmentType environmentType) {
     this.pipeline = executable(p);
     this.options = options;
-    this.artifactsDir = artifactsDir;
     this.environmentType = environmentType;
+    this.artifactRetrievalToken = artifactRetrievalToken;
   }
 
+  /**
+   * Creates a "ReferenceRunner" engine for a single pipeline with a Dockerized SDK harness.
+   *
+   * @param p Pipeline being executed for this job.
+   * @param options PipelineOptions for this job.
+   * @param artifactRetrievalToken Token to retrieve artifacts that have been staged.
+   */
   public static ReferenceRunner forPipeline(
-      RunnerApi.Pipeline p, Struct options, File artifactsDir) {
-    return new ReferenceRunner(p, options, artifactsDir, EnvironmentType.DOCKER);
+      RunnerApi.Pipeline p, Struct options, String artifactRetrievalToken) {
+    return new ReferenceRunner(p, options, artifactRetrievalToken, EnvironmentType.DOCKER);
   }
 
   static ReferenceRunner forInProcessPipeline(RunnerApi.Pipeline p, Struct options) {
-    return new ReferenceRunner(p, options, null, EnvironmentType.IN_PROCESS);
+    return new ReferenceRunner(p, options, "", EnvironmentType.IN_PROCESS);
   }
 
   private RunnerApi.Pipeline executable(RunnerApi.Pipeline original) {
@@ -170,17 +178,14 @@ public class ReferenceRunner {
             .setPipelineOptions(options)
             .setWorkerId("foo")
             .setResourceLimits(Resources.getDefaultInstance())
+            .setRetrievalToken(artifactRetrievalToken)
             .build();
     try (GrpcFnServer<GrpcLoggingService> logging =
             GrpcFnServer.allocatePortAndCreateFor(
                 GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
         GrpcFnServer<ArtifactRetrievalService> artifact =
-            artifactsDir == null
-                ? GrpcFnServer.allocatePortAndCreateFor(
-                    UnsupportedArtifactRetrievalService.create(), serverFactory)
-                : GrpcFnServer.allocatePortAndCreateFor(
-                    LocalFileSystemArtifactRetrievalService.forRootDirectory(artifactsDir),
-                    serverFactory);
+            GrpcFnServer.allocatePortAndCreateFor(
+                BeamFileSystemArtifactRetrievalService.create(), serverFactory);
         GrpcFnServer<StaticGrpcProvisionService> provisioning =
             GrpcFnServer.allocatePortAndCreateFor(
                 StaticGrpcProvisionService.create(provisionInfo), serverFactory);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java
index 5316de7..077a517 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java
@@ -18,10 +18,9 @@
 package org.apache.beam.runners.direct.portable.job;
 
 import com.google.auto.value.AutoValue;
-import java.nio.file.Path;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactStagerService;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
 
 /** A Job with a {@code prepare} call but no corresponding {@code run} call. */
@@ -35,9 +34,9 @@ abstract class PreparingJob implements AutoCloseable {
 
   abstract Struct getOptions();
 
-  abstract Path getStagingLocation();
+  abstract String getStagingSessionToken();
 
-  abstract GrpcFnServer<LocalFileSystemArtifactStagerService> getArtifactStagingServer();
+  abstract GrpcFnServer<BeamFileSystemArtifactStagingService> getArtifactStagingServer();
 
   @Override
   public void close() throws Exception {
@@ -50,10 +49,10 @@ abstract class PreparingJob implements AutoCloseable {
 
     abstract Builder setOptions(Struct options);
 
-    abstract Builder setStagingLocation(Path stagingLocation);
+    abstract Builder setStagingSessionToken(String stagingSessionToken);
 
     abstract Builder setArtifactStagingServer(
-        GrpcFnServer<LocalFileSystemArtifactStagerService> server);
+        GrpcFnServer<BeamFileSystemArtifactStagingService> server);
 
     abstract PreparingJob build();
   }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
index 8e1514f..31d84e8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct.portable.job;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
@@ -68,7 +69,10 @@ public class ReferenceRunnerJobServer {
 
   private static void runServer(ServerConfiguration configuration) throws Exception {
     ServerFactory serverFactory = ServerFactory.createDefault();
-    ReferenceRunnerJobService service = ReferenceRunnerJobService.create(serverFactory);
+    ReferenceRunnerJobService.Configuration jobServiceConfig =
+        createJobServiceConfig(configuration);
+    ReferenceRunnerJobService service =
+        ReferenceRunnerJobService.create(serverFactory, jobServiceConfig);
     try (GrpcFnServer<ReferenceRunnerJobService> server =
         createServer(configuration, serverFactory, service)) {
       System.out.println(
@@ -92,8 +96,13 @@ public class ReferenceRunnerJobServer {
 
   public String start() throws Exception {
     ServerFactory serverFactory = ServerFactory.createDefault();
+    ReferenceRunnerJobService.Configuration jobServiceConfig =
+        createJobServiceConfig(configuration);
     server =
-        createServer(configuration, serverFactory, ReferenceRunnerJobService.create(serverFactory));
+        createServer(
+            configuration,
+            serverFactory,
+            ReferenceRunnerJobService.create(serverFactory, jobServiceConfig));
 
     return server.getApiServiceDescriptor().getUrl();
   }
@@ -122,11 +131,33 @@ public class ReferenceRunnerJobServer {
         serverFactory);
   }
 
-  private static class ServerConfiguration {
+  /**
+   * Helper function to fill out a {@code ReferenceRunnerJobService.Configuration Configuration}
+   * object for {@code ReferenceRunnerJobService}.
+   */
+  private static ReferenceRunnerJobService.Configuration createJobServiceConfig(
+      ServerConfiguration configuration) {
+    ReferenceRunnerJobService.Configuration jobServiceConfig =
+        new ReferenceRunnerJobService.Configuration();
+    jobServiceConfig.artifactStagingPath = configuration.artifactStagingPath;
+    jobServiceConfig.keepArtifacts = configuration.keepArtifacts;
+    return jobServiceConfig;
+  }
+
+  public static class ServerConfiguration {
     @Option(
         name = "-p",
         aliases = {"--port"},
         usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)")
     private int port = 8099;
+
+    @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
+    String artifactStagingPath =
+        Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
+
+    @Option(
+        name = "--keep-artifacts",
+        usage = "When enabled, do not delete staged artifacts when a job completes")
+    boolean keepArtifacts;
   }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index cca4f55..f0a83e1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -19,9 +19,6 @@ package org.apache.beam.runners.direct.portable.job;
 
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -42,10 +39,10 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
 import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
 import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceImplBase;
 import org.apache.beam.runners.direct.portable.ReferenceRunner;
-import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactStagerService;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
 import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
@@ -54,31 +51,40 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Thre
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The ReferenceRunner uses the portability framework to execute a Pipeline on a single machine. */
+/**
+ * This JobService implements the grpc calls for running jobs by using the {@code ReferenceRunner}
+ * as an engine.
+ */
 public class ReferenceRunnerJobService extends JobServiceImplBase implements FnService {
+
+  /** A configuration object for constructing the {@code ReferenceRunnerJobService}. */
+  public static class Configuration {
+    public String artifactStagingPath;
+    public boolean keepArtifacts;
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ReferenceRunnerJobService.class);
   private static final int WAIT_MS = 1000;
 
-  public static ReferenceRunnerJobService create(final ServerFactory serverFactory) {
+  public static ReferenceRunnerJobService create(
+      final ServerFactory serverFactory, Configuration configuration) {
     LOG.info("Starting {}", ReferenceRunnerJobService.class);
-    return new ReferenceRunnerJobService(
-        serverFactory, () -> Files.createTempDirectory("reference-runner-staging"));
+    return new ReferenceRunnerJobService(serverFactory, configuration);
   }
 
   private final ServerFactory serverFactory;
-  private final Callable<Path> stagingPathCallable;
+  private final Configuration configuration;
 
   private final ConcurrentMap<String, PreparingJob> unpreparedJobs;
   private final ConcurrentMap<String, ReferenceRunner> runningJobs;
   private final ConcurrentMap<String, JobState.Enum> jobStates;
   private final ExecutorService executor;
-  private final ConcurrentLinkedQueue<GrpcFnServer<LocalFileSystemArtifactStagerService>>
+  private final ConcurrentLinkedQueue<GrpcFnServer<BeamFileSystemArtifactStagingService>>
       artifactStagingServices;
 
-  private ReferenceRunnerJobService(
-      ServerFactory serverFactory, Callable<Path> stagingPathCallable) {
+  private ReferenceRunnerJobService(ServerFactory serverFactory, Configuration configuration) {
     this.serverFactory = serverFactory;
-    this.stagingPathCallable = stagingPathCallable;
+    this.configuration = configuration;
     unpreparedJobs = new ConcurrentHashMap<>();
     runningJobs = new ConcurrentHashMap<>();
     jobStates = new ConcurrentHashMap<>();
@@ -91,10 +97,6 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
     artifactStagingServices = new ConcurrentLinkedQueue<>();
   }
 
-  public ReferenceRunnerJobService withStagingPathSupplier(Callable<Path> supplier) {
-    return new ReferenceRunnerJobService(serverFactory, supplier);
-  }
-
   @Override
   public void prepare(
       JobApi.PrepareJobRequest request,
@@ -103,29 +105,29 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
       LOG.trace("{} {}", PrepareJobResponse.class.getSimpleName(), request);
 
       String preparationId = request.getJobName() + ThreadLocalRandom.current().nextInt();
-      Path tempDir = stagingPathCallable.call();
-      GrpcFnServer<LocalFileSystemArtifactStagerService> artifactStagingService =
-          createArtifactStagingService(tempDir);
+      GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
+          createArtifactStagingService();
       artifactStagingServices.add(artifactStagingService);
-      PreparingJob previous =
+      String stagingSessionToken =
+          BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+              preparationId, configuration.artifactStagingPath);
+      PreparingJob existingJob =
           unpreparedJobs.putIfAbsent(
               preparationId,
               PreparingJob.builder()
                   .setArtifactStagingServer(artifactStagingService)
                   .setPipeline(request.getPipeline())
                   .setOptions(request.getPipelineOptions())
-                  .setStagingLocation(tempDir)
+                  .setStagingSessionToken(stagingSessionToken)
                   .build());
       checkArgument(
-          previous == null, "Unexpected existing job with preparation ID %s", preparationId);
+          existingJob == null, "Unexpected existing job with preparation ID %s", preparationId);
 
       responseObserver.onNext(
           PrepareJobResponse.newBuilder()
               .setPreparationId(preparationId)
               .setArtifactStagingEndpoint(artifactStagingService.getApiServiceDescriptor())
-              // ReferenceRunner uses LocalFileSystemArtifactStagerService which only need local
-              // artifact directory.
-              .setStagingSessionToken(tempDir.toFile().getAbsolutePath())
+              .setStagingSessionToken(stagingSessionToken)
               .build());
       responseObserver.onCompleted();
     } catch (Exception e) {
@@ -134,10 +136,9 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
     }
   }
 
-  private GrpcFnServer<LocalFileSystemArtifactStagerService> createArtifactStagingService(
-      Path stagingPath) throws Exception {
-    LocalFileSystemArtifactStagerService service =
-        LocalFileSystemArtifactStagerService.forRootDirectory(stagingPath.toFile());
+  private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService()
+      throws Exception {
+    BeamFileSystemArtifactStagingService service = new BeamFileSystemArtifactStagingService();
     return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
   }
 
@@ -165,9 +166,7 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
 
       ReferenceRunner runner =
           ReferenceRunner.forPipeline(
-              preparingJob.getPipeline(),
-              preparingJob.getOptions(),
-              preparingJob.getStagingLocation().toFile());
+              preparingJob.getPipeline(), preparingJob.getOptions(), request.getRetrievalToken());
       String jobId = "job-" + Integer.toString(ThreadLocalRandom.current().nextInt());
       responseObserver.onNext(RunJobResponse.newBuilder().setJobId(jobId).build());
       responseObserver.onCompleted();
@@ -179,11 +178,27 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
               jobStates.computeIfPresent(jobId, (id, status) -> Enum.RUNNING);
               runner.execute();
               jobStates.computeIfPresent(jobId, (id, status) -> Enum.DONE);
-              return null;
             } catch (Exception e) {
               jobStates.computeIfPresent(jobId, (id, status) -> Enum.FAILED);
               throw e;
             }
+
+            // Delete artifacts after job is done.
+            if (!configuration.keepArtifacts) {
+              String stagingSessionToken = preparingJob.getStagingSessionToken();
+              try {
+                preparingJob
+                    .getArtifactStagingServer()
+                    .getService()
+                    .removeArtifacts(stagingSessionToken);
+              } catch (Exception e) {
+                LOG.error(
+                    "Failed to remove job staging directory for token {}: {}",
+                    stagingSessionToken,
+                    e);
+              }
+            }
+            return null;
           });
     } catch (StatusRuntimeException e) {
       responseObserver.onError(e);
@@ -252,7 +267,7 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
       }
     }
     while (!artifactStagingServices.isEmpty()) {
-      GrpcFnServer<LocalFileSystemArtifactStagerService> artifactStagingService =
+      GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
           artifactStagingServices.remove();
       try {
         artifactStagingService.close();
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
index 89c9a91..7671d45 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
@@ -26,6 +26,7 @@ import java.io.FileOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -67,9 +68,11 @@ public class ReferenceRunnerJobServiceTest {
 
   @Before
   public void setup() throws Exception {
-    service =
-        ReferenceRunnerJobService.create(serverFactory)
-            .withStagingPathSupplier(() -> runnerTemp.getRoot().toPath());
+    ReferenceRunnerJobService.Configuration configuration =
+        new ReferenceRunnerJobService.Configuration();
+    configuration.artifactStagingPath =
+        Paths.get(runnerTemp.getRoot().toString(), "beam-artifact-staging").toString();
+    service = ReferenceRunnerJobService.create(serverFactory, configuration);
     server = GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
     stub =
         JobServiceGrpc.newBlockingStub(
@@ -95,7 +98,7 @@ public class ReferenceRunnerJobServiceTest {
     ArtifactServiceStager stager =
         ArtifactServiceStager.overChannel(
             InProcessChannelBuilder.forName(stagingEndpoint.getUrl()).build());
-    String stagingSessionToken = "token";
+    String stagingSessionToken = response.getStagingSessionToken();
     File foo = writeTempFile("foo", "foo, bar, baz".getBytes(UTF_8));
     File bar = writeTempFile("spam", "spam, ham, eggs".getBytes(UTF_8));
     stager.stage(