You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2019/01/23 18:11:49 UTC
[beam] branch master updated: [BEAM-6237] Fix ULR not deleting
artifacts after running jobs.
This is an automated email from the ASF dual-hosted git repository.
scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0e34795 [BEAM-6237] Fix ULR not deleting artifacts after running jobs.
new da66729 Merge pull request #7571: [BEAM-6237] Fix ULR not deleting artifacts after running jobs.
0e34795 is described below
commit 0e347955685c8d9310ee3ea8efa0f9268425de3d
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Fri Dec 21 16:37:48 2018 -0800
[BEAM-6237] Fix ULR not deleting artifacts after running jobs.
This change switches the ULR from using
LocalFileSystemArtifactStagerService to using
BeamFileSystemArtifactStagingService which has functionality to remove
artifacts after running a job. With this change ValidatesRunner tests
no longer leave huge amounts of artifacts when run with the ULR.
---
.../runners/direct/portable/ReferenceRunner.java | 39 +++++-----
.../runners/direct/portable/job/PreparingJob.java | 11 ++-
.../portable/job/ReferenceRunnerJobServer.java | 37 +++++++++-
.../portable/job/ReferenceRunnerJobService.java | 85 +++++++++++++---------
.../job/ReferenceRunnerJobServiceTest.java | 11 ++-
5 files changed, 118 insertions(+), 65 deletions(-)
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 2c2c9f0..4824bcb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -22,14 +22,12 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement;
-import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.ProvisionApi.ProvisionInfo;
import org.apache.beam.model.fnexecution.v1.ProvisionApi.Resources;
import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -56,13 +54,12 @@ import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides.TransformReplacement;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.direct.ExecutableGraph;
-import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactRetrievalService;
-import org.apache.beam.runners.direct.portable.artifact.UnsupportedArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -90,31 +87,42 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.joda.time.Instant;
-/** The "ReferenceRunner" engine implementation. */
+/**
+ * The "ReferenceRunner" engine implementation. The ReferenceRunner uses the portability framework
+ * to execute a Pipeline on a single machine.
+ */
public class ReferenceRunner {
private final RunnerApi.Pipeline pipeline;
private final Struct options;
- @Nullable private final File artifactsDir;
+ private final String artifactRetrievalToken;
private final EnvironmentType environmentType;
private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+ /** @param environmentType The environment to use for the SDK Harness. */
private ReferenceRunner(
- Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType environmentType) {
+ Pipeline p, Struct options, String artifactRetrievalToken, EnvironmentType environmentType) {
this.pipeline = executable(p);
this.options = options;
- this.artifactsDir = artifactsDir;
this.environmentType = environmentType;
+ this.artifactRetrievalToken = artifactRetrievalToken;
}
+ /**
+ * Creates a "ReferenceRunner" engine for a single pipeline with a Dockerized SDK harness.
+ *
+ * @param p Pipeline being executed for this job.
+ * @param options PipelineOptions for this job.
+ * @param artifactRetrievalToken Token to retrieve artifacts that have been staged.
+ */
public static ReferenceRunner forPipeline(
- RunnerApi.Pipeline p, Struct options, File artifactsDir) {
- return new ReferenceRunner(p, options, artifactsDir, EnvironmentType.DOCKER);
+ RunnerApi.Pipeline p, Struct options, String artifactRetrievalToken) {
+ return new ReferenceRunner(p, options, artifactRetrievalToken, EnvironmentType.DOCKER);
}
static ReferenceRunner forInProcessPipeline(RunnerApi.Pipeline p, Struct options) {
- return new ReferenceRunner(p, options, null, EnvironmentType.IN_PROCESS);
+ return new ReferenceRunner(p, options, "", EnvironmentType.IN_PROCESS);
}
private RunnerApi.Pipeline executable(RunnerApi.Pipeline original) {
@@ -170,17 +178,14 @@ public class ReferenceRunner {
.setPipelineOptions(options)
.setWorkerId("foo")
.setResourceLimits(Resources.getDefaultInstance())
+ .setRetrievalToken(artifactRetrievalToken)
.build();
try (GrpcFnServer<GrpcLoggingService> logging =
GrpcFnServer.allocatePortAndCreateFor(
GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
GrpcFnServer<ArtifactRetrievalService> artifact =
- artifactsDir == null
- ? GrpcFnServer.allocatePortAndCreateFor(
- UnsupportedArtifactRetrievalService.create(), serverFactory)
- : GrpcFnServer.allocatePortAndCreateFor(
- LocalFileSystemArtifactRetrievalService.forRootDirectory(artifactsDir),
- serverFactory);
+ GrpcFnServer.allocatePortAndCreateFor(
+ BeamFileSystemArtifactRetrievalService.create(), serverFactory);
GrpcFnServer<StaticGrpcProvisionService> provisioning =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(provisionInfo), serverFactory);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java
index 5316de7..077a517 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/PreparingJob.java
@@ -18,10 +18,9 @@
package org.apache.beam.runners.direct.portable.job;
import com.google.auto.value.AutoValue;
-import java.nio.file.Path;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactStagerService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
/** A Job with a {@code prepare} call but no corresponding {@code run} call. */
@@ -35,9 +34,9 @@ abstract class PreparingJob implements AutoCloseable {
abstract Struct getOptions();
- abstract Path getStagingLocation();
+ abstract String getStagingSessionToken();
- abstract GrpcFnServer<LocalFileSystemArtifactStagerService> getArtifactStagingServer();
+ abstract GrpcFnServer<BeamFileSystemArtifactStagingService> getArtifactStagingServer();
@Override
public void close() throws Exception {
@@ -50,10 +49,10 @@ abstract class PreparingJob implements AutoCloseable {
abstract Builder setOptions(Struct options);
- abstract Builder setStagingLocation(Path stagingLocation);
+ abstract Builder setStagingSessionToken(String stagingSessionToken);
abstract Builder setArtifactStagingServer(
- GrpcFnServer<LocalFileSystemArtifactStagerService> server);
+ GrpcFnServer<BeamFileSystemArtifactStagingService> server);
abstract PreparingJob build();
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
index 8e1514f..31d84e8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct.portable.job;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.Arrays;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
@@ -68,7 +69,10 @@ public class ReferenceRunnerJobServer {
private static void runServer(ServerConfiguration configuration) throws Exception {
ServerFactory serverFactory = ServerFactory.createDefault();
- ReferenceRunnerJobService service = ReferenceRunnerJobService.create(serverFactory);
+ ReferenceRunnerJobService.Configuration jobServiceConfig =
+ createJobServiceConfig(configuration);
+ ReferenceRunnerJobService service =
+ ReferenceRunnerJobService.create(serverFactory, jobServiceConfig);
try (GrpcFnServer<ReferenceRunnerJobService> server =
createServer(configuration, serverFactory, service)) {
System.out.println(
@@ -92,8 +96,13 @@ public class ReferenceRunnerJobServer {
public String start() throws Exception {
ServerFactory serverFactory = ServerFactory.createDefault();
+ ReferenceRunnerJobService.Configuration jobServiceConfig =
+ createJobServiceConfig(configuration);
server =
- createServer(configuration, serverFactory, ReferenceRunnerJobService.create(serverFactory));
+ createServer(
+ configuration,
+ serverFactory,
+ ReferenceRunnerJobService.create(serverFactory, jobServiceConfig));
return server.getApiServiceDescriptor().getUrl();
}
@@ -122,11 +131,33 @@ public class ReferenceRunnerJobServer {
serverFactory);
}
- private static class ServerConfiguration {
+ /**
+ * Helper function to fill out a {@code ReferenceRunnerJobService.Configuration Configuration}
+ * object for {@code ReferenceRunnerJobService}.
+ */
+ private static ReferenceRunnerJobService.Configuration createJobServiceConfig(
+ ServerConfiguration configuration) {
+ ReferenceRunnerJobService.Configuration jobServiceConfig =
+ new ReferenceRunnerJobService.Configuration();
+ jobServiceConfig.artifactStagingPath = configuration.artifactStagingPath;
+ jobServiceConfig.keepArtifacts = configuration.keepArtifacts;
+ return jobServiceConfig;
+ }
+
+ public static class ServerConfiguration {
@Option(
name = "-p",
aliases = {"--port"},
usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)")
private int port = 8099;
+
+ @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
+ String artifactStagingPath =
+ Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
+
+ @Option(
+ name = "--keep-artifacts",
+ usage = "When enabled, do not delete staged artifacts when a job completes")
+ boolean keepArtifacts;
}
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
index cca4f55..f0a83e1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobService.java
@@ -19,9 +19,6 @@ package org.apache.beam.runners.direct.portable.job;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
@@ -42,10 +39,10 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceImplBase;
import org.apache.beam.runners.direct.portable.ReferenceRunner;
-import org.apache.beam.runners.direct.portable.artifact.LocalFileSystemArtifactStagerService;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
@@ -54,31 +51,40 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Thre
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** The ReferenceRunner uses the portability framework to execute a Pipeline on a single machine. */
+/**
+ * This JobService implements the grpc calls for running jobs by using the {@code ReferenceRunner}
+ * as an engine.
+ */
public class ReferenceRunnerJobService extends JobServiceImplBase implements FnService {
+
+ /** A configuration object for constructing the {@code ReferenceRunnerJobService}. */
+ public static class Configuration {
+ public String artifactStagingPath;
+ public boolean keepArtifacts;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(ReferenceRunnerJobService.class);
private static final int WAIT_MS = 1000;
- public static ReferenceRunnerJobService create(final ServerFactory serverFactory) {
+ public static ReferenceRunnerJobService create(
+ final ServerFactory serverFactory, Configuration configuration) {
LOG.info("Starting {}", ReferenceRunnerJobService.class);
- return new ReferenceRunnerJobService(
- serverFactory, () -> Files.createTempDirectory("reference-runner-staging"));
+ return new ReferenceRunnerJobService(serverFactory, configuration);
}
private final ServerFactory serverFactory;
- private final Callable<Path> stagingPathCallable;
+ private final Configuration configuration;
private final ConcurrentMap<String, PreparingJob> unpreparedJobs;
private final ConcurrentMap<String, ReferenceRunner> runningJobs;
private final ConcurrentMap<String, JobState.Enum> jobStates;
private final ExecutorService executor;
- private final ConcurrentLinkedQueue<GrpcFnServer<LocalFileSystemArtifactStagerService>>
+ private final ConcurrentLinkedQueue<GrpcFnServer<BeamFileSystemArtifactStagingService>>
artifactStagingServices;
- private ReferenceRunnerJobService(
- ServerFactory serverFactory, Callable<Path> stagingPathCallable) {
+ private ReferenceRunnerJobService(ServerFactory serverFactory, Configuration configuration) {
this.serverFactory = serverFactory;
- this.stagingPathCallable = stagingPathCallable;
+ this.configuration = configuration;
unpreparedJobs = new ConcurrentHashMap<>();
runningJobs = new ConcurrentHashMap<>();
jobStates = new ConcurrentHashMap<>();
@@ -91,10 +97,6 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
artifactStagingServices = new ConcurrentLinkedQueue<>();
}
- public ReferenceRunnerJobService withStagingPathSupplier(Callable<Path> supplier) {
- return new ReferenceRunnerJobService(serverFactory, supplier);
- }
-
@Override
public void prepare(
JobApi.PrepareJobRequest request,
@@ -103,29 +105,29 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
LOG.trace("{} {}", PrepareJobResponse.class.getSimpleName(), request);
String preparationId = request.getJobName() + ThreadLocalRandom.current().nextInt();
- Path tempDir = stagingPathCallable.call();
- GrpcFnServer<LocalFileSystemArtifactStagerService> artifactStagingService =
- createArtifactStagingService(tempDir);
+ GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
+ createArtifactStagingService();
artifactStagingServices.add(artifactStagingService);
- PreparingJob previous =
+ String stagingSessionToken =
+ BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+ preparationId, configuration.artifactStagingPath);
+ PreparingJob existingJob =
unpreparedJobs.putIfAbsent(
preparationId,
PreparingJob.builder()
.setArtifactStagingServer(artifactStagingService)
.setPipeline(request.getPipeline())
.setOptions(request.getPipelineOptions())
- .setStagingLocation(tempDir)
+ .setStagingSessionToken(stagingSessionToken)
.build());
checkArgument(
- previous == null, "Unexpected existing job with preparation ID %s", preparationId);
+ existingJob == null, "Unexpected existing job with preparation ID %s", preparationId);
responseObserver.onNext(
PrepareJobResponse.newBuilder()
.setPreparationId(preparationId)
.setArtifactStagingEndpoint(artifactStagingService.getApiServiceDescriptor())
- // ReferenceRunner uses LocalFileSystemArtifactStagerService which only need local
- // artifact directory.
- .setStagingSessionToken(tempDir.toFile().getAbsolutePath())
+ .setStagingSessionToken(stagingSessionToken)
.build());
responseObserver.onCompleted();
} catch (Exception e) {
@@ -134,10 +136,9 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
}
}
- private GrpcFnServer<LocalFileSystemArtifactStagerService> createArtifactStagingService(
- Path stagingPath) throws Exception {
- LocalFileSystemArtifactStagerService service =
- LocalFileSystemArtifactStagerService.forRootDirectory(stagingPath.toFile());
+ private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService()
+ throws Exception {
+ BeamFileSystemArtifactStagingService service = new BeamFileSystemArtifactStagingService();
return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
}
@@ -165,9 +166,7 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
ReferenceRunner runner =
ReferenceRunner.forPipeline(
- preparingJob.getPipeline(),
- preparingJob.getOptions(),
- preparingJob.getStagingLocation().toFile());
+ preparingJob.getPipeline(), preparingJob.getOptions(), request.getRetrievalToken());
String jobId = "job-" + Integer.toString(ThreadLocalRandom.current().nextInt());
responseObserver.onNext(RunJobResponse.newBuilder().setJobId(jobId).build());
responseObserver.onCompleted();
@@ -179,11 +178,27 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
jobStates.computeIfPresent(jobId, (id, status) -> Enum.RUNNING);
runner.execute();
jobStates.computeIfPresent(jobId, (id, status) -> Enum.DONE);
- return null;
} catch (Exception e) {
jobStates.computeIfPresent(jobId, (id, status) -> Enum.FAILED);
throw e;
}
+
+ // Delete artifacts after job is done.
+ if (!configuration.keepArtifacts) {
+ String stagingSessionToken = preparingJob.getStagingSessionToken();
+ try {
+ preparingJob
+ .getArtifactStagingServer()
+ .getService()
+ .removeArtifacts(stagingSessionToken);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to remove job staging directory for token {}: {}",
+ stagingSessionToken,
+ e);
+ }
+ }
+ return null;
});
} catch (StatusRuntimeException e) {
responseObserver.onError(e);
@@ -252,7 +267,7 @@ public class ReferenceRunnerJobService extends JobServiceImplBase implements FnS
}
}
while (!artifactStagingServices.isEmpty()) {
- GrpcFnServer<LocalFileSystemArtifactStagerService> artifactStagingService =
+ GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
artifactStagingServices.remove();
try {
artifactStagingService.close();
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
index 89c9a91..7671d45 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServiceTest.java
@@ -26,6 +26,7 @@ import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -67,9 +68,11 @@ public class ReferenceRunnerJobServiceTest {
@Before
public void setup() throws Exception {
- service =
- ReferenceRunnerJobService.create(serverFactory)
- .withStagingPathSupplier(() -> runnerTemp.getRoot().toPath());
+ ReferenceRunnerJobService.Configuration configuration =
+ new ReferenceRunnerJobService.Configuration();
+ configuration.artifactStagingPath =
+ Paths.get(runnerTemp.getRoot().toString(), "beam-artifact-staging").toString();
+ service = ReferenceRunnerJobService.create(serverFactory, configuration);
server = GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
stub =
JobServiceGrpc.newBlockingStub(
@@ -95,7 +98,7 @@ public class ReferenceRunnerJobServiceTest {
ArtifactServiceStager stager =
ArtifactServiceStager.overChannel(
InProcessChannelBuilder.forName(stagingEndpoint.getUrl()).build());
- String stagingSessionToken = "token";
+ String stagingSessionToken = response.getStagingSessionToken();
File foo = writeTempFile("foo", "foo, bar, baz".getBytes(UTF_8));
File bar = writeTempFile("spam", "spam, ham, eggs".getBytes(UTF_8));
stager.stage(