You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2019/12/03 21:45:08 UTC
[beam] branch release-2.17.0 updated: [BEAM-8815] Skip manifest
when no artifacts are staged
This is an automated email from the ASF dual-hosted git repository.
mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push:
new 4aed759 [BEAM-8815] Skip manifest when no artifacts are staged
new 9171e0f Merge pull request #10213 from tweise/release-2.17.0
4aed759 is described below
commit 4aed759cda92a14528a68ffacd9dfc1dbe4c340f
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sat Nov 23 21:22:51 2019 -0800
[BEAM-8815] Skip manifest when no artifacts are staged
---
.../artifact/AbstractArtifactRetrievalService.java | 20 ++++++-----
.../artifact/AbstractArtifactStagingService.java | 40 +++++++++++++---------
.../BeamFileSystemArtifactServicesTest.java | 18 ++++++++++
3 files changed, 53 insertions(+), 25 deletions(-)
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java
index 93ae657..72af9e8 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java
@@ -95,15 +95,19 @@ public abstract class AbstractArtifactRetrievalService
LOG.info("GetManifest for {}", token);
try {
- ArtifactApi.ProxyManifest proxyManifest = getManifestProxy(token);
+ final ArtifactApi.Manifest manifest;
+ if (AbstractArtifactStagingService.NO_ARTIFACTS_STAGED_TOKEN.equals(token)) {
+ manifest = ArtifactApi.Manifest.newBuilder().build();
+ } else {
+ ArtifactApi.ProxyManifest proxyManifest = getManifestProxy(token);
+ LOG.info(
+ "GetManifest for {} -> {} artifacts",
+ token,
+ proxyManifest.getManifest().getArtifactCount());
+ manifest = proxyManifest.getManifest();
+ }
ArtifactApi.GetManifestResponse response =
- ArtifactApi.GetManifestResponse.newBuilder()
- .setManifest(proxyManifest.getManifest())
- .build();
- LOG.info(
- "GetManifest for {} -> {} artifacts",
- token,
- proxyManifest.getManifest().getArtifactCount());
+ ArtifactApi.GetManifestResponse.newBuilder().setManifest(manifest).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java
index 25f09a3..86e79a5 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java
@@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractArtifactStagingService extends ArtifactStagingServiceImplBase
implements FnService {
+ public static final String NO_ARTIFACTS_STAGED_TOKEN = "__no_artifacts_staged__";
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractArtifactStagingService.class);
private static final Charset CHARSET = StandardCharsets.UTF_8;
@@ -77,25 +79,29 @@ public abstract class AbstractArtifactStagingService extends ArtifactStagingServ
public void commitManifest(
CommitManifestRequest request, StreamObserver<CommitManifestResponse> responseObserver) {
try {
- String stagingSessionToken = request.getStagingSessionToken();
- ProxyManifest.Builder proxyManifestBuilder =
- ProxyManifest.newBuilder().setManifest(request.getManifest());
- for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {
- proxyManifestBuilder.addLocation(
- Location.newBuilder()
- .setName(artifactMetadata.getName())
- .setUri(getArtifactUri(stagingSessionToken, encodedFileName(artifactMetadata)))
- .build());
- }
- try (WritableByteChannel manifestWritableByteChannel = openManifest(stagingSessionToken)) {
- manifestWritableByteChannel.write(
- CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build())));
+ final String retrievalToken;
+ if (request.getManifest().getArtifactCount() > 0) {
+ String stagingSessionToken = request.getStagingSessionToken();
+ ProxyManifest.Builder proxyManifestBuilder =
+ ProxyManifest.newBuilder().setManifest(request.getManifest());
+ for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {
+ proxyManifestBuilder.addLocation(
+ Location.newBuilder()
+ .setName(artifactMetadata.getName())
+ .setUri(getArtifactUri(stagingSessionToken, encodedFileName(artifactMetadata)))
+ .build());
+ }
+ try (WritableByteChannel manifestWritableByteChannel = openManifest(stagingSessionToken)) {
+ manifestWritableByteChannel.write(
+ CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build())));
+ }
+ retrievalToken = getRetrievalToken(stagingSessionToken);
+ // TODO: Validate integrity of staged files.
+ } else {
+ retrievalToken = NO_ARTIFACTS_STAGED_TOKEN;
}
- // TODO: Validate integrity of staged files.
responseObserver.onNext(
- CommitManifestResponse.newBuilder()
- .setRetrievalToken(getRetrievalToken(stagingSessionToken))
- .build());
+ CommitManifestResponse.newBuilder().setRetrievalToken(retrievalToken).build());
responseObserver.onCompleted();
} catch (Exception e) {
// TODO: Cleanup all the artifacts.
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
index 40807ca..9585530 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
@@ -206,6 +206,24 @@ public class BeamFileSystemArtifactServicesTest {
}
@Test
+ public void noArtifactsTest() throws Exception {
+ String stagingSession = "123";
+ String stagingSessionToken =
+ BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+ stagingSession, stagingDir.toUri().getPath());
+ String stagingToken = commitManifest(stagingSessionToken, Collections.emptyList());
+ Assert.assertEquals(AbstractArtifactStagingService.NO_ARTIFACTS_STAGED_TOKEN, stagingToken);
+ Assert.assertFalse(
+ Files.exists(Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession)));
+
+ GetManifestResponse retrievedManifest =
+ retrievalBlockingStub.getManifest(
+ GetManifestRequest.newBuilder().setRetrievalToken(stagingToken).build());
+ Assert.assertEquals(
+ "Manifest with 0 artifacts", 0, retrievedManifest.getManifest().getArtifactCount());
+ }
+
+ @Test
public void putArtifactsSingleSmallFileTest() throws Exception {
String fileName = "file1";
String stagingSession = "123";