You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2019/12/03 21:45:08 UTC

[beam] branch release-2.17.0 updated: [BEAM-8815] Skip manifest when no artifacts are staged

This is an automated email from the ASF dual-hosted git repository.

mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.17.0 by this push:
     new 4aed759  [BEAM-8815] Skip manifest when no artifacts are staged
     new 9171e0f  Merge pull request #10213 from tweise/release-2.17.0
4aed759 is described below

commit 4aed759cda92a14528a68ffacd9dfc1dbe4c340f
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sat Nov 23 21:22:51 2019 -0800

    [BEAM-8815] Skip manifest when no artifacts are staged
---
 .../artifact/AbstractArtifactRetrievalService.java | 20 ++++++-----
 .../artifact/AbstractArtifactStagingService.java   | 40 +++++++++++++---------
 .../BeamFileSystemArtifactServicesTest.java        | 18 ++++++++++
 3 files changed, 53 insertions(+), 25 deletions(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java
index 93ae657..72af9e8 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java
@@ -95,15 +95,19 @@ public abstract class AbstractArtifactRetrievalService
 
     LOG.info("GetManifest for {}", token);
     try {
-      ArtifactApi.ProxyManifest proxyManifest = getManifestProxy(token);
+      final ArtifactApi.Manifest manifest;
+      if (AbstractArtifactStagingService.NO_ARTIFACTS_STAGED_TOKEN.equals(token)) {
+        manifest = ArtifactApi.Manifest.newBuilder().build();
+      } else {
+        ArtifactApi.ProxyManifest proxyManifest = getManifestProxy(token);
+        LOG.info(
+            "GetManifest for {} -> {} artifacts",
+            token,
+            proxyManifest.getManifest().getArtifactCount());
+        manifest = proxyManifest.getManifest();
+      }
       ArtifactApi.GetManifestResponse response =
-          ArtifactApi.GetManifestResponse.newBuilder()
-              .setManifest(proxyManifest.getManifest())
-              .build();
-      LOG.info(
-          "GetManifest for {} -> {} artifacts",
-          token,
-          proxyManifest.getManifest().getArtifactCount());
+          ArtifactApi.GetManifestResponse.newBuilder().setManifest(manifest).build();
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     } catch (Exception e) {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java
index 25f09a3..86e79a5 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java
@@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractArtifactStagingService extends ArtifactStagingServiceImplBase
     implements FnService {
 
+  public static final String NO_ARTIFACTS_STAGED_TOKEN = "__no_artifacts_staged__";
+
   private static final Logger LOG = LoggerFactory.getLogger(AbstractArtifactStagingService.class);
 
   private static final Charset CHARSET = StandardCharsets.UTF_8;
@@ -77,25 +79,29 @@ public abstract class AbstractArtifactStagingService extends ArtifactStagingServ
   public void commitManifest(
       CommitManifestRequest request, StreamObserver<CommitManifestResponse> responseObserver) {
     try {
-      String stagingSessionToken = request.getStagingSessionToken();
-      ProxyManifest.Builder proxyManifestBuilder =
-          ProxyManifest.newBuilder().setManifest(request.getManifest());
-      for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {
-        proxyManifestBuilder.addLocation(
-            Location.newBuilder()
-                .setName(artifactMetadata.getName())
-                .setUri(getArtifactUri(stagingSessionToken, encodedFileName(artifactMetadata)))
-                .build());
-      }
-      try (WritableByteChannel manifestWritableByteChannel = openManifest(stagingSessionToken)) {
-        manifestWritableByteChannel.write(
-            CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build())));
+      final String retrievalToken;
+      if (request.getManifest().getArtifactCount() > 0) {
+        String stagingSessionToken = request.getStagingSessionToken();
+        ProxyManifest.Builder proxyManifestBuilder =
+            ProxyManifest.newBuilder().setManifest(request.getManifest());
+        for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {
+          proxyManifestBuilder.addLocation(
+              Location.newBuilder()
+                  .setName(artifactMetadata.getName())
+                  .setUri(getArtifactUri(stagingSessionToken, encodedFileName(artifactMetadata)))
+                  .build());
+        }
+        try (WritableByteChannel manifestWritableByteChannel = openManifest(stagingSessionToken)) {
+          manifestWritableByteChannel.write(
+              CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build())));
+        }
+        retrievalToken = getRetrievalToken(stagingSessionToken);
+        // TODO: Validate integrity of staged files.
+      } else {
+        retrievalToken = NO_ARTIFACTS_STAGED_TOKEN;
       }
-      // TODO: Validate integrity of staged files.
       responseObserver.onNext(
-          CommitManifestResponse.newBuilder()
-              .setRetrievalToken(getRetrievalToken(stagingSessionToken))
-              .build());
+          CommitManifestResponse.newBuilder().setRetrievalToken(retrievalToken).build());
       responseObserver.onCompleted();
     } catch (Exception e) {
       // TODO: Cleanup all the artifacts.
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
index 40807ca..9585530 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
@@ -206,6 +206,24 @@ public class BeamFileSystemArtifactServicesTest {
   }
 
   @Test
+  public void noArtifactsTest() throws Exception {
+    String stagingSession = "123";
+    String stagingSessionToken =
+        BeamFileSystemArtifactStagingService.generateStagingSessionToken(
+            stagingSession, stagingDir.toUri().getPath());
+    String stagingToken = commitManifest(stagingSessionToken, Collections.emptyList());
+    Assert.assertEquals(AbstractArtifactStagingService.NO_ARTIFACTS_STAGED_TOKEN, stagingToken);
+    Assert.assertFalse(
+        Files.exists(Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession)));
+
+    GetManifestResponse retrievedManifest =
+        retrievalBlockingStub.getManifest(
+            GetManifestRequest.newBuilder().setRetrievalToken(stagingToken).build());
+    Assert.assertEquals(
+        "Manifest with 0 artifacts", 0, retrievedManifest.getManifest().getArtifactCount());
+  }
+
+  @Test
   public void putArtifactsSingleSmallFileTest() throws Exception {
     String fileName = "file1";
     String stagingSession = "123";