You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/08 05:33:02 UTC

[GitHub] [beam] robertwb opened a new pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

robertwb opened a new pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342
 
 
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410508971
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
+            try {
+              LOG.debug("Storing artifacts for {} as {}", stagingToken, name);
+              currentOutput = new ArrayBlockingQueue<ByteString>(100);
+              stagedFutures
+                  .get(currentEnvironment)
+                  .add(
+                      stagingExecutor.submit(
+                          new StoreArtifact(
+                              stagingToken,
+                              name,
+                              currentArtifact,
+                              currentOutput,
+                              totalPendingBytes)));
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              responseObserver.onError(exn);
+            }
+            state = State.GETCHUNK;
+            // fall through
+
+          case GETCHUNK:
+            try {
+              ByteString chunk = responseWrapper.getGetArtifactResponse().getData();
+              if (chunk.size() > 0) {
+                totalPendingBytes.aquire(chunk.size());
+                currentOutput.put(chunk);
+              }
+              if (responseWrapper.getIsLast()) {
+                currentOutput.put(ByteString.EMPTY); // The EOF value.
+                if (pendingGets.isEmpty()) {
+                  resolveNextEnvironment(responseObserver);
+                } else {
+                  state = State.GET;
+                  LOG.debug("Waiting for {}", pendingGets.peek());
+                }
+              }
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              onError(exn);
+            }
+            break;
+
+          default:
+            responseObserver.onError(
+                new StatusException(
+                    Status.INVALID_ARGUMENT.withDescription("Illegal state " + state)));
+        }
+      }
+
+      private void resolveNextEnvironment(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        if (pendingResolves.isEmpty()) {
+          finishStaging(responseObserver);
+        } else {
+          state = State.RESOLVE;
+          LOG.info("Resolving artifacts for {}.{}.", stagingToken, pendingResolves.peek());
+          responseObserver.onNext(
+              ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                  .setResolveArtifact(
+                      ArtifactApi.ResolveArtifactsRequest.newBuilder()
+                          .addAllArtifacts(toResolve.get(pendingResolves.peek())))
+                  .build());
+        }
+      }
+
+      private void finishStaging(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        LOG.debug("Finishing staging for {}.", stagingToken);
+        Map<String, List<RunnerApi.ArtifactInformation>> staged = new HashMap<>();
+        try {
+          for (Map.Entry<String, List<Future<RunnerApi.ArtifactInformation>>> entry :
+              stagedFutures.entrySet()) {
+            List<RunnerApi.ArtifactInformation> envStaged = new ArrayList<>();
+            for (Future<RunnerApi.ArtifactInformation> future : entry.getValue()) {
+              envStaged.add(future.get());
+            }
+            staged.put(entry.getKey(), envStaged);
+          }
+          ArtifactStagingService.this.staged.put(stagingToken, staged);
+          stagingExecutor.shutdown();
+          state = State.DONE;
+          LOG.info("Artifacts fully staged for {}.", stagingToken);
+          responseObserver.onCompleted();
+        } catch (Exception exn) {
+          LOG.error("Error staging artifacts", exn);
+          responseObserver.onError(exn);
+          state = State.ERROR;
+          return;
+        }
+      }
+
+      /**
+       * Return an alternative artifact if we do not need to get this over the artifact API, or
+       * possibly at all.
+       */
+      private Optional<RunnerApi.ArtifactInformation> getLocal(
+          RunnerApi.ArtifactInformation artifact) {
+        return Optional.empty();
+      }
+
+      /**
+       * Attempts to provide a reasonable filename for the artifact.
+       *
+       * @param index a monotonically increasing index, which provides uniqueness
+       * @param environment the environment id
+       * @param artifact the artifact itself
+       */
+      private String createFilename(
+          int index, String environment, RunnerApi.ArtifactInformation artifact) {
+        String path;
+        try {
+          if (artifact.getRoleUrn().equals(ArtifactRetrievalService.STAGING_TO_ARTIFACT_URN)) {
+            path =
+                RunnerApi.ArtifactStagingToRolePayload.parseFrom(artifact.getRolePayload())
+                    .getStagedName();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.FILE_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()).getPath();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.URL_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactUrlPayload.parseFrom(artifact.getTypePayload()).getUrl();
+          } else {
+            path = "artifact";
+          }
+        } catch (InvalidProtocolBufferException exn) {
+          throw new RuntimeException(exn);
+        }
+        // Limit to the last contiguous alpha-numeric sequence. In particular, this will exclude
+        // all path separators.
+        List<String> components = Splitter.onPattern("[^A-Za-z-_.]]").splitToList(path);
+        String base = components.get(components.size() - 1);
+        return clip(String.format("%d-%s-%s", index, clip(environment, 25), base), 100);
+      }
+
+      private String clip(String s, int maxLength) {
+        return s.length() < maxLength ? s : s.substring(0, maxLength);
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        stagingExecutor.shutdownNow();
+        LOG.error("Error staging artifacts", throwable);
+        state = State.ERROR;
+      }
+
+      @Override
+      public void onCompleted() {
+        assert state == State.DONE;
+      }
+    };
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Nothing to close.
+  }
+
+  /**
+   * Lazily stages artifacts by letting an ArtifactStagingService resolve and request artifacts.
+   *
+   * @param retrievalService an ArtifactRetrievalService used to resolve and retrieve artifacts
+   * @param stagingService an ArtifactStagingService stub which will request artifacts
+   * @param stagingToken the staging token of the job whose artifacts will be retrieved
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public static void offer(
+      ArtifactRetrievalService retrievalService,
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingService,
+      String stagingToken)
+      throws InterruptedException, IOException {
+    StagingRequestObserver requestObserver = new StagingRequestObserver(retrievalService);
+    requestObserver.responseObserver =
+        stagingService.reverseArtifactRetrievalService(requestObserver);
+    requestObserver.responseObserver.onNext(
+        ArtifactApi.ArtifactResponseWrapper.newBuilder().setStagingToken(stagingToken).build());
+    requestObserver.waitUntilDone();
+    if (requestObserver.error != null) {
+      if (requestObserver.error instanceof IOException) {
+        throw (IOException) requestObserver.error;
+      } else {
+        throw new IOException(requestObserver.error);
+      }
+    }
+  }
+
+  /** Actually implements the reverse retrieval protocol. */
+  private static class StagingRequestObserver
+      implements StreamObserver<ArtifactApi.ArtifactRequestWrapper> {
+
+    private ArtifactRetrievalService retrievalService;
+
+    public StagingRequestObserver(ArtifactRetrievalService retrievalService) {
+      this.retrievalService = retrievalService;
+    }
+
+    CountDownLatch latch = new CountDownLatch(1);
+    StreamObserver<ArtifactApi.ArtifactResponseWrapper> responseObserver;
+    Throwable error;
 
 Review comment:
   ```suggestion
       private final ArtifactRetrievalService retrievalService;
       private final CountDownLatch latch = new CountDownLatch(1);
       private final StreamObserver<ArtifactApi.ArtifactResponseWrapper> responseObserver;
       private Throwable error;
   
       public StagingRequestObserver(ArtifactRetrievalService retrievalService) {
         this.retrievalService = retrievalService;
       }
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410513283
 
 

 ##########
 File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/ArtifactRetrievalServiceTest.java
 ##########
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessChannelBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ArtifactRetrievalServiceTest {
+  private static final int TEST_BUFFER_SIZE = 1 << 10;
+  private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private ArtifactRetrievalService retrievalService;
+  private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceStub retrievalStub;
+  private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalBlockingStub;
+  private Path stagingDir;
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
+    retrievalService = new ArtifactRetrievalService(TEST_BUFFER_SIZE);
+    retrievalServer =
+        GrpcFnServer.allocatePortAndCreateFor(retrievalService, InProcessServerFactory.create());
+    ManagedChannel retrievalChannel =
 
 Review comment:
   Consider using the GrpcCleanupRule as seen here:
   https://github.com/grpc/grpc-java/blob/68297d6d7c17453eeae0e0ffbce03edc1eda0a12/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldServerTest.java#L49

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on issue #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
robertwb commented on issue #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#issuecomment-610804809
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410513072
 
 

 ##########
 File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingServiceTest.java
 ##########
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessChannelBuilder;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ArtifactStagingServiceTest {
+  private static final int TEST_BUFFER_SIZE = 1 << 10;
+  private GrpcFnServer<ArtifactStagingService> stagingServer;
+  private ArtifactStagingService stagingService;
+  private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private ArtifactRetrievalService retrievalService;
+  private ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingStub;
+  private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalBlockingStub;
+  private Path stagingDir;
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
+    stagingDir = tempFolder.newFolder("staging").toPath();
+    stagingService =
+        new ArtifactStagingService(
+            ArtifactStagingService.beamFilesystemArtifactDestinationProvider(
+                stagingDir.toString()));
+    stagingServer =
+        GrpcFnServer.allocatePortAndCreateFor(stagingService, InProcessServerFactory.create());
+    ManagedChannel stagingChannel =
+        InProcessChannelBuilder.forName(stagingServer.getApiServiceDescriptor().getUrl()).build();
+    stagingStub = ArtifactStagingServiceGrpc.newStub(stagingChannel);
+
+    retrievalService = new ArtifactRetrievalService(TEST_BUFFER_SIZE);
+    retrievalServer =
+        GrpcFnServer.allocatePortAndCreateFor(retrievalService, InProcessServerFactory.create());
+    ManagedChannel retrievalChannel =
+        InProcessChannelBuilder.forName(retrievalServer.getApiServiceDescriptor().getUrl()).build();
+    retrievalBlockingStub = ArtifactRetrievalServiceGrpc.newBlockingStub(retrievalChannel);
+  }
+
+  private static class FakeArtifactRetrievalService extends ArtifactRetrievalService {
+
+    @Override
+    public void resolveArtifacts(
+        ArtifactApi.ResolveArtifactsRequest request,
+        StreamObserver<ArtifactApi.ResolveArtifactsResponse> responseObserver) {
+      ArtifactApi.ResolveArtifactsResponse.Builder response =
+          ArtifactApi.ResolveArtifactsResponse.newBuilder();
+      for (RunnerApi.ArtifactInformation artifact : request.getArtifactsList()) {
+        if (artifact.getTypeUrn().equals("resolved")) {
+          response.addReplacements(artifact);
+        } else if (artifact.getTypeUrn().equals("unresolved")) {
+          response.addReplacements(artifact.toBuilder().setTypeUrn("resolved").build());
+        } else {
+          throw new UnsupportedOperationException(artifact.getTypeUrn());
+        }
+      }
+      responseObserver.onNext(response.build());
+      responseObserver.onCompleted();
+    }
+
+    @Override
+    public void getArtifact(
+        ArtifactApi.GetArtifactRequest request,
+        StreamObserver<ArtifactApi.GetArtifactResponse> responseObserver) {
+      if (request.getArtifact().getTypeUrn().equals("resolved")) {
+        ByteString data = request.getArtifact().getTypePayload();
+        responseObserver.onNext(
+            ArtifactApi.GetArtifactResponse.newBuilder().setData(data.substring(0, 1)).build());
+        responseObserver.onNext(
+            ArtifactApi.GetArtifactResponse.newBuilder().setData(data.substring(1)).build());
+        responseObserver.onCompleted();
+      } else {
+        throw new UnsupportedOperationException(request.getArtifact().getTypeUrn());
+      }
+    }
+
+    public static RunnerApi.ArtifactInformation resolvedArtifact(String contents) {
+      return RunnerApi.ArtifactInformation.newBuilder()
+          .setTypeUrn("resolved")
+          .setTypePayload(ByteString.copyFromUtf8(contents))
+          .setRoleUrn(contents)
+          .build();
+    }
+
+    public static RunnerApi.ArtifactInformation unresolvedArtifact(String contents) {
+      return RunnerApi.ArtifactInformation.newBuilder()
+          .setTypeUrn("unresolved")
+          .setTypePayload(ByteString.copyFromUtf8(contents))
+          .setRoleUrn(contents)
+          .build();
+    }
+  }
+
+  private String getArtifact(RunnerApi.ArtifactInformation artifact) {
+    ByteString all = ByteString.EMPTY;
+    Iterator<ArtifactApi.GetArtifactResponse> response =
+        retrievalBlockingStub.getArtifact(
+            ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact).build());
+    while (response.hasNext()) {
+      all = all.concat(response.next().getData());
+    }
+    return all.toStringUtf8();
+  }
+
+  @Test
+  public void testStageArtifacts() throws IOException, InterruptedException {
+    List<String> contentsList =
+        ImmutableList.of("a", "bb", Strings.repeat("xyz", TEST_BUFFER_SIZE * 3 / 4));
+    stagingService.registerJob(
+        "stagingToken",
+        ImmutableMap.of(
+            "env1",
+            Lists.transform(contentsList, FakeArtifactRetrievalService::resolvedArtifact),
+            "env2",
+            Lists.transform(contentsList, FakeArtifactRetrievalService::unresolvedArtifact)));
+    ArtifactStagingService.offer(new FakeArtifactRetrievalService(), stagingStub, "stagingToken");
+    Map<String, List<RunnerApi.ArtifactInformation>> staged =
+        stagingService.getStagedArtifacts("stagingToken");
+    assertEquals(2, staged.size());
+    checkArtifacts(contentsList, staged.get("env1"));
+    checkArtifacts(contentsList, staged.get("env2"));
+  }
+
+  private void checkArtifacts(
+      Collection<String> expetedContents, List<RunnerApi.ArtifactInformation> staged) {
 
 Review comment:
   `expetedContents` -> `expectedContents`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410511440
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
+            try {
+              LOG.debug("Storing artifacts for {} as {}", stagingToken, name);
+              currentOutput = new ArrayBlockingQueue<ByteString>(100);
+              stagedFutures
+                  .get(currentEnvironment)
+                  .add(
+                      stagingExecutor.submit(
+                          new StoreArtifact(
+                              stagingToken,
+                              name,
+                              currentArtifact,
+                              currentOutput,
+                              totalPendingBytes)));
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              responseObserver.onError(exn);
+            }
+            state = State.GETCHUNK;
+            // fall through
+
+          case GETCHUNK:
+            try {
+              ByteString chunk = responseWrapper.getGetArtifactResponse().getData();
+              if (chunk.size() > 0) {
+                totalPendingBytes.aquire(chunk.size());
+                currentOutput.put(chunk);
+              }
+              if (responseWrapper.getIsLast()) {
+                currentOutput.put(ByteString.EMPTY); // The EOF value.
+                if (pendingGets.isEmpty()) {
+                  resolveNextEnvironment(responseObserver);
+                } else {
+                  state = State.GET;
+                  LOG.debug("Waiting for {}", pendingGets.peek());
+                }
+              }
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              onError(exn);
+            }
+            break;
+
+          default:
+            responseObserver.onError(
+                new StatusException(
+                    Status.INVALID_ARGUMENT.withDescription("Illegal state " + state)));
+        }
+      }
+
+      private void resolveNextEnvironment(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        if (pendingResolves.isEmpty()) {
+          finishStaging(responseObserver);
+        } else {
+          state = State.RESOLVE;
+          LOG.info("Resolving artifacts for {}.{}.", stagingToken, pendingResolves.peek());
+          responseObserver.onNext(
+              ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                  .setResolveArtifact(
+                      ArtifactApi.ResolveArtifactsRequest.newBuilder()
+                          .addAllArtifacts(toResolve.get(pendingResolves.peek())))
+                  .build());
+        }
+      }
+
+      private void finishStaging(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        LOG.debug("Finishing staging for {}.", stagingToken);
+        Map<String, List<RunnerApi.ArtifactInformation>> staged = new HashMap<>();
+        try {
+          for (Map.Entry<String, List<Future<RunnerApi.ArtifactInformation>>> entry :
+              stagedFutures.entrySet()) {
+            List<RunnerApi.ArtifactInformation> envStaged = new ArrayList<>();
+            for (Future<RunnerApi.ArtifactInformation> future : entry.getValue()) {
+              envStaged.add(future.get());
+            }
+            staged.put(entry.getKey(), envStaged);
+          }
+          ArtifactStagingService.this.staged.put(stagingToken, staged);
+          stagingExecutor.shutdown();
+          state = State.DONE;
+          LOG.info("Artifacts fully staged for {}.", stagingToken);
+          responseObserver.onCompleted();
+        } catch (Exception exn) {
+          LOG.error("Error staging artifacts", exn);
+          responseObserver.onError(exn);
+          state = State.ERROR;
+          return;
+        }
+      }
+
+      /**
+       * Return an alternative artifact if we do not need to get this over the artifact API, or
+       * possibly at all.
+       */
+      private Optional<RunnerApi.ArtifactInformation> getLocal(
+          RunnerApi.ArtifactInformation artifact) {
+        return Optional.empty();
+      }
+
+      /**
+       * Attempts to provide a reasonable filename for the artifact.
+       *
+       * @param index a monotonically increasing index, which provides uniqueness
+       * @param environment the environment id
+       * @param artifact the artifact itself
+       */
+      private String createFilename(
+          int index, String environment, RunnerApi.ArtifactInformation artifact) {
+        String path;
+        try {
+          if (artifact.getRoleUrn().equals(ArtifactRetrievalService.STAGING_TO_ARTIFACT_URN)) {
+            path =
+                RunnerApi.ArtifactStagingToRolePayload.parseFrom(artifact.getRolePayload())
+                    .getStagedName();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.FILE_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()).getPath();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.URL_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactUrlPayload.parseFrom(artifact.getTypePayload()).getUrl();
+          } else {
+            path = "artifact";
+          }
+        } catch (InvalidProtocolBufferException exn) {
+          throw new RuntimeException(exn);
+        }
+        // Limit to the last contiguous alpha-numeric sequence. In particular, this will exclude
+        // all path separators.
+        List<String> components = Splitter.onPattern("[^A-Za-z-_.]]").splitToList(path);
+        String base = components.get(components.size() - 1);
+        return clip(String.format("%d-%s-%s", index, clip(environment, 25), base), 100);
+      }
+
+      private String clip(String s, int maxLength) {
+        return s.length() < maxLength ? s : s.substring(0, maxLength);
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        stagingExecutor.shutdownNow();
+        LOG.error("Error staging artifacts", throwable);
+        state = State.ERROR;
+      }
+
+      @Override
+      public void onCompleted() {
+        assert state == State.DONE;
+      }
+    };
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Nothing to close.
+  }
+
+  /**
+   * Lazily stages artifacts by letting an ArtifactStagingService resolve and request artifacts.
+   *
+   * @param retrievalService an ArtifactRetrievalService used to resolve and retrieve artifacts
+   * @param stagingService an ArtifactStagingService stub which will request artifacts
+   * @param stagingToken the staging token of the job whose artifacts will be retrieved
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public static void offer(
+      ArtifactRetrievalService retrievalService,
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingService,
+      String stagingToken)
+      throws InterruptedException, IOException {
+    StagingRequestObserver requestObserver = new StagingRequestObserver(retrievalService);
+    requestObserver.responseObserver =
+        stagingService.reverseArtifactRetrievalService(requestObserver);
+    requestObserver.responseObserver.onNext(
+        ArtifactApi.ArtifactResponseWrapper.newBuilder().setStagingToken(stagingToken).build());
+    requestObserver.waitUntilDone();
+    if (requestObserver.error != null) {
+      if (requestObserver.error instanceof IOException) {
+        throw (IOException) requestObserver.error;
+      } else {
+        throw new IOException(requestObserver.error);
+      }
+    }
+  }
+
+  /** Actually implements the reverse retrieval protocol. */
+  private static class StagingRequestObserver
+      implements StreamObserver<ArtifactApi.ArtifactRequestWrapper> {
+
+    private ArtifactRetrievalService retrievalService;
+
+    public StagingRequestObserver(ArtifactRetrievalService retrievalService) {
+      this.retrievalService = retrievalService;
+    }
+
+    CountDownLatch latch = new CountDownLatch(1);
 
 Review comment:
   You could use a CompletableFuture<Void> instead of `latch` and `error`. In the onCompleted/onError methods you complete the future. `waitUntilDone` would then just be `future.get`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410505526
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
+            try {
+              LOG.debug("Storing artifacts for {} as {}", stagingToken, name);
+              currentOutput = new ArrayBlockingQueue<ByteString>(100);
+              stagedFutures
+                  .get(currentEnvironment)
+                  .add(
+                      stagingExecutor.submit(
+                          new StoreArtifact(
+                              stagingToken,
+                              name,
+                              currentArtifact,
+                              currentOutput,
+                              totalPendingBytes)));
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              responseObserver.onError(exn);
+            }
+            state = State.GETCHUNK;
+            // fall through
+
+          case GETCHUNK:
+            try {
+              ByteString chunk = responseWrapper.getGetArtifactResponse().getData();
+              if (chunk.size() > 0) {
+                totalPendingBytes.aquire(chunk.size());
+                currentOutput.put(chunk);
+              }
+              if (responseWrapper.getIsLast()) {
+                currentOutput.put(ByteString.EMPTY); // The EOF value.
 
 Review comment:
   An implementation could return an empty byte string but still say it isn't the last one which would incorrectly translate into EOF. Consider using an EOF bytestring instance object and doing == comparison to detect that its specifically the EOF instance.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410508728
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
+            try {
+              LOG.debug("Storing artifacts for {} as {}", stagingToken, name);
+              currentOutput = new ArrayBlockingQueue<ByteString>(100);
+              stagedFutures
+                  .get(currentEnvironment)
+                  .add(
+                      stagingExecutor.submit(
+                          new StoreArtifact(
+                              stagingToken,
+                              name,
+                              currentArtifact,
+                              currentOutput,
+                              totalPendingBytes)));
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              responseObserver.onError(exn);
+            }
+            state = State.GETCHUNK;
+            // fall through
+
+          case GETCHUNK:
+            try {
+              ByteString chunk = responseWrapper.getGetArtifactResponse().getData();
+              if (chunk.size() > 0) {
+                totalPendingBytes.aquire(chunk.size());
+                currentOutput.put(chunk);
+              }
+              if (responseWrapper.getIsLast()) {
+                currentOutput.put(ByteString.EMPTY); // The EOF value.
+                if (pendingGets.isEmpty()) {
+                  resolveNextEnvironment(responseObserver);
+                } else {
+                  state = State.GET;
+                  LOG.debug("Waiting for {}", pendingGets.peek());
+                }
+              }
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              onError(exn);
+            }
+            break;
+
+          default:
+            responseObserver.onError(
+                new StatusException(
+                    Status.INVALID_ARGUMENT.withDescription("Illegal state " + state)));
+        }
+      }
+
+      private void resolveNextEnvironment(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        if (pendingResolves.isEmpty()) {
+          finishStaging(responseObserver);
+        } else {
+          state = State.RESOLVE;
+          LOG.info("Resolving artifacts for {}.{}.", stagingToken, pendingResolves.peek());
+          responseObserver.onNext(
+              ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                  .setResolveArtifact(
+                      ArtifactApi.ResolveArtifactsRequest.newBuilder()
+                          .addAllArtifacts(toResolve.get(pendingResolves.peek())))
+                  .build());
+        }
+      }
+
+      private void finishStaging(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        LOG.debug("Finishing staging for {}.", stagingToken);
+        Map<String, List<RunnerApi.ArtifactInformation>> staged = new HashMap<>();
+        try {
+          for (Map.Entry<String, List<Future<RunnerApi.ArtifactInformation>>> entry :
+              stagedFutures.entrySet()) {
+            List<RunnerApi.ArtifactInformation> envStaged = new ArrayList<>();
+            for (Future<RunnerApi.ArtifactInformation> future : entry.getValue()) {
+              envStaged.add(future.get());
+            }
+            staged.put(entry.getKey(), envStaged);
+          }
+          ArtifactStagingService.this.staged.put(stagingToken, staged);
+          stagingExecutor.shutdown();
+          state = State.DONE;
+          LOG.info("Artifacts fully staged for {}.", stagingToken);
+          responseObserver.onCompleted();
+        } catch (Exception exn) {
+          LOG.error("Error staging artifacts", exn);
+          responseObserver.onError(exn);
+          state = State.ERROR;
+          return;
+        }
+      }
+
+      /**
+       * Return an alternative artifact if we do not need to get this over the artifact API, or
+       * possibly at all.
+       */
+      private Optional<RunnerApi.ArtifactInformation> getLocal(
+          RunnerApi.ArtifactInformation artifact) {
+        return Optional.empty();
+      }
+
+      /**
+       * Attempts to provide a reasonable filename for the artifact.
+       *
+       * @param index a monotonically increasing index, which provides uniqueness
+       * @param environment the environment id
+       * @param artifact the artifact itself
+       */
+      private String createFilename(
+          int index, String environment, RunnerApi.ArtifactInformation artifact) {
+        String path;
+        try {
+          if (artifact.getRoleUrn().equals(ArtifactRetrievalService.STAGING_TO_ARTIFACT_URN)) {
+            path =
+                RunnerApi.ArtifactStagingToRolePayload.parseFrom(artifact.getRolePayload())
+                    .getStagedName();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.FILE_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()).getPath();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.URL_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactUrlPayload.parseFrom(artifact.getTypePayload()).getUrl();
+          } else {
+            path = "artifact";
+          }
+        } catch (InvalidProtocolBufferException exn) {
+          throw new RuntimeException(exn);
+        }
+        // Limit to the last contiguous alpha-numeric sequence. In particular, this will exclude
+        // all path separators.
+        List<String> components = Splitter.onPattern("[^A-Za-z-_.]]").splitToList(path);
+        String base = components.get(components.size() - 1);
+        return clip(String.format("%d-%s-%s", index, clip(environment, 25), base), 100);
+      }
+
+      private String clip(String s, int maxLength) {
+        return s.length() < maxLength ? s : s.substring(0, maxLength);
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        stagingExecutor.shutdownNow();
+        LOG.error("Error staging artifacts", throwable);
+        state = State.ERROR;
+      }
+
+      @Override
+      public void onCompleted() {
+        assert state == State.DONE;
 
 Review comment:
   assert isn't run unless a JVM flag is enabled, did you want to use checkState instead and/or responseObserver.error(...)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410506079
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
 
 Review comment:
   Consider using IdGenerators.incrementingLongs and storing a static final reference within this class instead of using nameIndex:
   https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/IdGenerators.java#L26

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410512766
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
 
 Review comment:
   Did you want to make this static so that we are only buffering 100mb for the entire process or did you want this limit per file?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410509119
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
+            try {
+              LOG.debug("Storing artifacts for {} as {}", stagingToken, name);
+              currentOutput = new ArrayBlockingQueue<ByteString>(100);
+              stagedFutures
+                  .get(currentEnvironment)
+                  .add(
+                      stagingExecutor.submit(
+                          new StoreArtifact(
+                              stagingToken,
+                              name,
+                              currentArtifact,
+                              currentOutput,
+                              totalPendingBytes)));
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              responseObserver.onError(exn);
+            }
+            state = State.GETCHUNK;
+            // fall through
+
+          case GETCHUNK:
+            try {
+              ByteString chunk = responseWrapper.getGetArtifactResponse().getData();
+              if (chunk.size() > 0) {
+                totalPendingBytes.aquire(chunk.size());
+                currentOutput.put(chunk);
+              }
+              if (responseWrapper.getIsLast()) {
+                currentOutput.put(ByteString.EMPTY); // The EOF value.
+                if (pendingGets.isEmpty()) {
+                  resolveNextEnvironment(responseObserver);
+                } else {
+                  state = State.GET;
+                  LOG.debug("Waiting for {}", pendingGets.peek());
+                }
+              }
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              onError(exn);
+            }
+            break;
+
+          default:
+            responseObserver.onError(
+                new StatusException(
+                    Status.INVALID_ARGUMENT.withDescription("Illegal state " + state)));
+        }
+      }
+
+      private void resolveNextEnvironment(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        if (pendingResolves.isEmpty()) {
+          finishStaging(responseObserver);
+        } else {
+          state = State.RESOLVE;
+          LOG.info("Resolving artifacts for {}.{}.", stagingToken, pendingResolves.peek());
+          responseObserver.onNext(
+              ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                  .setResolveArtifact(
+                      ArtifactApi.ResolveArtifactsRequest.newBuilder()
+                          .addAllArtifacts(toResolve.get(pendingResolves.peek())))
+                  .build());
+        }
+      }
+
+      private void finishStaging(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        LOG.debug("Finishing staging for {}.", stagingToken);
+        Map<String, List<RunnerApi.ArtifactInformation>> staged = new HashMap<>();
+        try {
+          for (Map.Entry<String, List<Future<RunnerApi.ArtifactInformation>>> entry :
+              stagedFutures.entrySet()) {
+            List<RunnerApi.ArtifactInformation> envStaged = new ArrayList<>();
+            for (Future<RunnerApi.ArtifactInformation> future : entry.getValue()) {
+              envStaged.add(future.get());
+            }
+            staged.put(entry.getKey(), envStaged);
+          }
+          ArtifactStagingService.this.staged.put(stagingToken, staged);
+          stagingExecutor.shutdown();
+          state = State.DONE;
+          LOG.info("Artifacts fully staged for {}.", stagingToken);
+          responseObserver.onCompleted();
+        } catch (Exception exn) {
+          LOG.error("Error staging artifacts", exn);
+          responseObserver.onError(exn);
+          state = State.ERROR;
+          return;
+        }
+      }
+
+      /**
+       * Return an alternative artifact if we do not need to get this over the artifact API, or
+       * possibly at all.
+       */
+      private Optional<RunnerApi.ArtifactInformation> getLocal(
+          RunnerApi.ArtifactInformation artifact) {
+        return Optional.empty();
+      }
+
+      /**
+       * Attempts to provide a reasonable filename for the artifact.
+       *
+       * @param index a monotonically increasing index, which provides uniqueness
+       * @param environment the environment id
+       * @param artifact the artifact itself
+       */
+      private String createFilename(
+          int index, String environment, RunnerApi.ArtifactInformation artifact) {
+        String path;
+        try {
+          if (artifact.getRoleUrn().equals(ArtifactRetrievalService.STAGING_TO_ARTIFACT_URN)) {
+            path =
+                RunnerApi.ArtifactStagingToRolePayload.parseFrom(artifact.getRolePayload())
+                    .getStagedName();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.FILE_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()).getPath();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.URL_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactUrlPayload.parseFrom(artifact.getTypePayload()).getUrl();
+          } else {
+            path = "artifact";
+          }
+        } catch (InvalidProtocolBufferException exn) {
+          throw new RuntimeException(exn);
+        }
+        // Limit to the last contiguous alpha-numeric sequence. In particular, this will exclude
+        // all path separators.
+        List<String> components = Splitter.onPattern("[^A-Za-z-_.]]").splitToList(path);
+        String base = components.get(components.size() - 1);
+        return clip(String.format("%d-%s-%s", index, clip(environment, 25), base), 100);
+      }
+
+      private String clip(String s, int maxLength) {
+        return s.length() < maxLength ? s : s.substring(0, maxLength);
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        stagingExecutor.shutdownNow();
+        LOG.error("Error staging artifacts", throwable);
+        state = State.ERROR;
+      }
+
+      @Override
+      public void onCompleted() {
+        assert state == State.DONE;
+      }
+    };
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Nothing to close.
+  }
+
+  /**
+   * Lazily stages artifacts by letting an ArtifactStagingService resolve and request artifacts.
+   *
+   * @param retrievalService an ArtifactRetrievalService used to resolve and retrieve artifacts
+   * @param stagingService an ArtifactStagingService stub which will request artifacts
+   * @param stagingToken the staging token of the job whose artifacts will be retrieved
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public static void offer(
+      ArtifactRetrievalService retrievalService,
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingService,
+      String stagingToken)
+      throws InterruptedException, IOException {
+    StagingRequestObserver requestObserver = new StagingRequestObserver(retrievalService);
+    requestObserver.responseObserver =
+        stagingService.reverseArtifactRetrievalService(requestObserver);
+    requestObserver.responseObserver.onNext(
+        ArtifactApi.ArtifactResponseWrapper.newBuilder().setStagingToken(stagingToken).build());
+    requestObserver.waitUntilDone();
+    if (requestObserver.error != null) {
+      if (requestObserver.error instanceof IOException) {
+        throw (IOException) requestObserver.error;
+      } else {
+        throw new IOException(requestObserver.error);
+      }
+    }
+  }
+
+  /** Actually implements the reverse retrieval protocol. */
+  private static class StagingRequestObserver
+      implements StreamObserver<ArtifactApi.ArtifactRequestWrapper> {
+
+    private ArtifactRetrievalService retrievalService;
+
+    public StagingRequestObserver(ArtifactRetrievalService retrievalService) {
+      this.retrievalService = retrievalService;
+    }
+
+    CountDownLatch latch = new CountDownLatch(1);
+    StreamObserver<ArtifactApi.ArtifactResponseWrapper> responseObserver;
+    Throwable error;
+
+    @Override
+    public void onNext(ArtifactApi.ArtifactRequestWrapper requestWrapper) {
+      assert responseObserver != null;
 
 Review comment:
   checkState?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on issue #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
robertwb commented on issue #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#issuecomment-612245120
 
 
   R: @lukecwik 
   CC: @ihji 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r409053390
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
 
 Review comment:
   nit: `statingToken` -> `stagingToken`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410513258
 
 

 ##########
 File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingServiceTest.java
 ##########
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessChannelBuilder;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ArtifactStagingServiceTest {
+  private static final int TEST_BUFFER_SIZE = 1 << 10;
+  private GrpcFnServer<ArtifactStagingService> stagingServer;
+  private ArtifactStagingService stagingService;
+  private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private ArtifactRetrievalService retrievalService;
+  private ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingStub;
+  private ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalBlockingStub;
+  private Path stagingDir;
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
 
 Review comment:
   Consider using the GrpcCleanupRule as seen here:
   https://github.com/grpc/grpc-java/blob/68297d6d7c17453eeae0e0ffbce03edc1eda0a12/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldServerTest.java#L49

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410501167
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
 
 Review comment:
   Consider using CompletedFuture.completedFuture(fetched.get())
   
   https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#completedFuture-U-

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410509918
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
+            try {
+              LOG.debug("Storing artifacts for {} as {}", stagingToken, name);
+              currentOutput = new ArrayBlockingQueue<ByteString>(100);
+              stagedFutures
+                  .get(currentEnvironment)
+                  .add(
+                      stagingExecutor.submit(
+                          new StoreArtifact(
+                              stagingToken,
+                              name,
+                              currentArtifact,
+                              currentOutput,
+                              totalPendingBytes)));
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              responseObserver.onError(exn);
+            }
+            state = State.GETCHUNK;
+            // fall through
+
+          case GETCHUNK:
+            try {
+              ByteString chunk = responseWrapper.getGetArtifactResponse().getData();
+              if (chunk.size() > 0) {
+                totalPendingBytes.aquire(chunk.size());
+                currentOutput.put(chunk);
+              }
+              if (responseWrapper.getIsLast()) {
+                currentOutput.put(ByteString.EMPTY); // The EOF value.
+                if (pendingGets.isEmpty()) {
+                  resolveNextEnvironment(responseObserver);
+                } else {
+                  state = State.GET;
+                  LOG.debug("Waiting for {}", pendingGets.peek());
+                }
+              }
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              onError(exn);
+            }
+            break;
+
+          default:
+            responseObserver.onError(
+                new StatusException(
+                    Status.INVALID_ARGUMENT.withDescription("Illegal state " + state)));
+        }
+      }
+
+      private void resolveNextEnvironment(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        if (pendingResolves.isEmpty()) {
+          finishStaging(responseObserver);
+        } else {
+          state = State.RESOLVE;
+          LOG.info("Resolving artifacts for {}.{}.", stagingToken, pendingResolves.peek());
+          responseObserver.onNext(
+              ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                  .setResolveArtifact(
+                      ArtifactApi.ResolveArtifactsRequest.newBuilder()
+                          .addAllArtifacts(toResolve.get(pendingResolves.peek())))
+                  .build());
+        }
+      }
+
+      private void finishStaging(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        LOG.debug("Finishing staging for {}.", stagingToken);
+        Map<String, List<RunnerApi.ArtifactInformation>> staged = new HashMap<>();
+        try {
+          for (Map.Entry<String, List<Future<RunnerApi.ArtifactInformation>>> entry :
+              stagedFutures.entrySet()) {
+            List<RunnerApi.ArtifactInformation> envStaged = new ArrayList<>();
+            for (Future<RunnerApi.ArtifactInformation> future : entry.getValue()) {
+              envStaged.add(future.get());
+            }
+            staged.put(entry.getKey(), envStaged);
+          }
+          ArtifactStagingService.this.staged.put(stagingToken, staged);
+          stagingExecutor.shutdown();
+          state = State.DONE;
+          LOG.info("Artifacts fully staged for {}.", stagingToken);
+          responseObserver.onCompleted();
+        } catch (Exception exn) {
+          LOG.error("Error staging artifacts", exn);
+          responseObserver.onError(exn);
+          state = State.ERROR;
+          return;
+        }
+      }
+
+      /**
+       * Return an alternative artifact if we do not need to get this over the artifact API, or
+       * possibly at all.
+       */
+      private Optional<RunnerApi.ArtifactInformation> getLocal(
+          RunnerApi.ArtifactInformation artifact) {
+        return Optional.empty();
+      }
+
+      /**
+       * Attempts to provide a reasonable filename for the artifact.
+       *
+       * @param index a monotonically increasing index, which provides uniqueness
+       * @param environment the environment id
+       * @param artifact the artifact itself
+       */
+      private String createFilename(
+          int index, String environment, RunnerApi.ArtifactInformation artifact) {
+        String path;
+        try {
+          if (artifact.getRoleUrn().equals(ArtifactRetrievalService.STAGING_TO_ARTIFACT_URN)) {
+            path =
+                RunnerApi.ArtifactStagingToRolePayload.parseFrom(artifact.getRolePayload())
+                    .getStagedName();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.FILE_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()).getPath();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.URL_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactUrlPayload.parseFrom(artifact.getTypePayload()).getUrl();
+          } else {
+            path = "artifact";
+          }
+        } catch (InvalidProtocolBufferException exn) {
+          throw new RuntimeException(exn);
+        }
+        // Limit to the last contiguous alpha-numeric sequence. In particular, this will exclude
+        // all path separators.
+        List<String> components = Splitter.onPattern("[^A-Za-z-_.]]").splitToList(path);
+        String base = components.get(components.size() - 1);
+        return clip(String.format("%d-%s-%s", index, clip(environment, 25), base), 100);
+      }
+
+      private String clip(String s, int maxLength) {
+        return s.length() < maxLength ? s : s.substring(0, maxLength);
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        stagingExecutor.shutdownNow();
+        LOG.error("Error staging artifacts", throwable);
+        state = State.ERROR;
+      }
+
+      @Override
+      public void onCompleted() {
+        assert state == State.DONE;
+      }
+    };
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Nothing to close.
+  }
+
+  /**
+   * Lazily stages artifacts by letting an ArtifactStagingService resolve and request artifacts.
+   *
+   * @param retrievalService an ArtifactRetrievalService used to resolve and retrieve artifacts
+   * @param stagingService an ArtifactStagingService stub which will request artifacts
+   * @param stagingToken the staging token of the job whose artifacts will be retrieved
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public static void offer(
+      ArtifactRetrievalService retrievalService,
+      ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stagingService,
+      String stagingToken)
+      throws InterruptedException, IOException {
+    StagingRequestObserver requestObserver = new StagingRequestObserver(retrievalService);
+    requestObserver.responseObserver =
+        stagingService.reverseArtifactRetrievalService(requestObserver);
 
 Review comment:
   I had problems like this as well when using gRPC API. You should be able to pass in stagingService to the StagingRequestObserver constructor allowing the responseObserver to be final since you should be able to pass `this` to reverseArtifactRetrievalService

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410512493
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
 
 Review comment:
   why does this need to be synchronized?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410499146
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
 
 Review comment:
   This will avoid a copy being done via toByteArray
   ```suggestion
             chunk.writeTo(dest.getOutputStream());
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11342: [BEAM-9577] New artifact staging and retrieval service for Java.
URL: https://github.com/apache/beam/pull/11342#discussion_r410507938
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##########
 @@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArtifactStagingService
+    extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
+
+  private final ArtifactDestinationProvider destinationProvider;
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged =
+      new ConcurrentHashMap<>();
+
+  public ArtifactStagingService(ArtifactDestinationProvider destinationProvider) {
+    this.destinationProvider = destinationProvider;
+  }
+
+  /**
+   * Registers a set of artifacts to be staged with this service.
+   *
+   * <p>A client (e.g. a Beam SDK) is expected to connect to this service with the given staging
+   * token and offer resolution and retrieval of this set of artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   * @param artifacts all artifacts to stage, keyed by environment
+   */
+  public void registerJob(
+      String stagingToken, Map<String, List<RunnerApi.ArtifactInformation>> artifacts) {
+    assert !toStage.containsKey(stagingToken);
+    toStage.put(stagingToken, artifacts);
+  }
+
+  /**
+   * Returns the rewritten artifacts associated with this job, keyed by environment.
+   *
+   * <p>This should be called after the client has finished offering artifacts.
+   *
+   * @param stagingToken a staging token for this job
+   */
+  public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String stagingToken) {
+    toStage.remove(stagingToken);
+    return staged.remove(stagingToken);
+  }
+
+  /** Provides a concrete location to which artifacts can be staged on retrieval. */
+  public interface ArtifactDestinationProvider {
+    ArtifactDestination getDestination(String stagingToken, String name) throws IOException;
+  }
+
+  /**
+   * A pairing of a newly created artifact type and an ouptut stream that will be readable at that
+   * type.
+   */
+  @AutoValue
+  public abstract static class ArtifactDestination {
+    public static ArtifactDestination create(
+        String typeUrn, ByteString typePayload, OutputStream out) {
+      return new AutoValue_ArtifactStagingService_ArtifactDestination(typeUrn, typePayload, out);
+    }
+
+    public static ArtifactDestination fromFile(String path) throws IOException {
+      return fromFile(
+          path,
+          Channels.newOutputStream(
+              FileSystems.create(
+                  FileSystems.matchNewResource(path, false /* isDirectory */), MimeTypes.BINARY)));
+    }
+
+    public static ArtifactDestination fromFile(String path, OutputStream out) {
+      return create(
+          ArtifactRetrievalService.FILE_ARTIFACT_URN,
+          RunnerApi.ArtifactFilePayload.newBuilder().setPath(path).build().toByteString(),
+          out);
+    }
+
+    public abstract String getTypeUrn();
+
+    public abstract ByteString getTypePayload();
+
+    public abstract OutputStream getOutputStream();
+  }
+
+  /**
+   * An ArtifactDestinationProvider that places new artifacts as files in a Beam filesystem.
+   *
+   * @param root the directory in which to place all artifacts
+   */
+  public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
+    return (statingToken, name) -> {
+      ResourceId path =
+          FileSystems.matchNewResource(root, true)
+              .resolve(statingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+              .resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+      return ArtifactDestination.fromFile(path.toString());
+    };
+  }
+
+  private enum State {
+    START,
+    RESOLVE,
+    GET,
+    GETCHUNK,
+    DONE,
+    ERROR,
+  }
+
+  /**
+   * Like the standard Semaphore, but allows an aquire to go over the limit if there is any room.
+   *
+   * <p>Also allows setting an error, to avoid issues with un-released aquires after error.
+   */
+  private static class OverflowingSemaphore {
+    private int totalPermits;
+    private int usedPermits;
+    private Exception exception;
+
+    public OverflowingSemaphore(int totalPermits) {
+      this.totalPermits = totalPermits;
+      this.usedPermits = 0;
+    }
+
+    synchronized void aquire(int permits) throws Exception {
+      while (usedPermits >= totalPermits) {
+        if (exception != null) {
+          throw exception;
+        }
+        this.wait();
+      }
+      usedPermits += permits;
+    }
+
+    synchronized void release(int permits) {
+      usedPermits -= permits;
+      this.notifyAll();
+    }
+
+    synchronized void setException(Exception exception) {
+      this.exception = exception;
+      this.notifyAll();
+    }
+  }
+
+  /** A task that pulls bytes off a queue and actually writes them to a staging location. */
+  private class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
+
+    private String stagingToken;
+    private String name;
+    private RunnerApi.ArtifactInformation originalArtifact;
+    private BlockingQueue<ByteString> bytesQueue;
+    private OverflowingSemaphore totalPendingBytes;
+
+    public StoreArtifact(
+        String stagingToken,
+        String name,
+        RunnerApi.ArtifactInformation originalArtifact,
+        BlockingQueue<ByteString> bytesQueue,
+        OverflowingSemaphore totalPendingBytes) {
+      this.stagingToken = stagingToken;
+      this.name = name;
+      this.originalArtifact = originalArtifact;
+      this.bytesQueue = bytesQueue;
+      this.totalPendingBytes = totalPendingBytes;
+    }
+
+    @Override
+    public RunnerApi.ArtifactInformation call() throws IOException {
+      try {
+        ArtifactDestination dest = destinationProvider.getDestination(stagingToken, name);
+        LOG.debug("Storing artifact for {}.{} at {}", stagingToken, name, dest);
+        ByteString chunk = bytesQueue.take();
+        while (chunk.size() > 0) {
+          totalPendingBytes.release(chunk.size());
+          dest.getOutputStream().write(chunk.toByteArray());
+          chunk = bytesQueue.take();
+        }
+        dest.getOutputStream().close();
+        return originalArtifact
+            .toBuilder()
+            .setTypeUrn(dest.getTypeUrn())
+            .setTypePayload(dest.getTypePayload())
+            .build();
+      } catch (IOException | InterruptedException exn) {
+        // As this thread will no longer be draining the queue, we don't want to get stuck writing
+        // to it.
+        totalPendingBytes.setException(exn);
+        LOG.error("Exception staging artifacts", exn);
+        if (exn instanceof IOException) {
+          throw (IOException) exn;
+        } else {
+          throw new RuntimeException(exn);
+        }
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(
+      StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+
+    return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() {
+
+      /** The maximum number of parallel threads to use to stage. */
+      public static final int THREAD_POOL_SIZE = 10;
+
+      /** The maximum number of bytes to buffer across all writes before throttling. */
+      public static final int MAX_PENDING_BYTES = 100 << 20; // 100 MB
+
+      String stagingToken;
+      Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
+      Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
+      ExecutorService stagingExecutor;
+      OverflowingSemaphore totalPendingBytes;
+
+      State state = State.START;
+      Queue<String> pendingResolves;
+      String currentEnvironment;
+      int nameIndex;
+      Queue<RunnerApi.ArtifactInformation> pendingGets;
+      BlockingQueue<ByteString> currentOutput;
+
+      @Override
+      @SuppressFBWarnings(value = "SF_SWITCH_FALLTHROUGH", justification = "fallthrough intended")
+      public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper responseWrapper) {
+        switch (state) {
+          case START:
+            stagingToken = responseWrapper.getStagingToken();
+            LOG.info("Staging artifacts for {}.", stagingToken);
+            toResolve = toStage.get(stagingToken);
+            stagedFutures = new ConcurrentHashMap<>();
+            pendingResolves = new ArrayDeque<>();
+            pendingResolves.addAll(toResolve.keySet());
+            stagingExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+            totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
+            resolveNextEnvironment(responseObserver);
+            break;
+
+          case RESOLVE:
+            {
+              currentEnvironment = pendingResolves.remove();
+              stagedFutures.put(currentEnvironment, new ArrayList<>());
+              pendingGets = new ArrayDeque<>();
+              for (RunnerApi.ArtifactInformation artifact :
+                  responseWrapper.getResolveArtifactResponse().getReplacementsList()) {
+                Optional<RunnerApi.ArtifactInformation> fetched = getLocal(artifact);
+                if (fetched.isPresent()) {
+                  stagedFutures
+                      .get(currentEnvironment)
+                      .add(new FutureTask<RunnerApi.ArtifactInformation>(() -> fetched.get()));
+                } else {
+                  pendingGets.add(artifact);
+                  responseObserver.onNext(
+                      ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                          .setGetArtifact(
+                              ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifact))
+                          .build());
+                }
+              }
+              LOG.info(
+                  "Getting {} artifacts for {}.{}.",
+                  pendingGets.size(),
+                  stagingToken,
+                  pendingResolves.peek());
+              if (pendingGets.isEmpty()) {
+                resolveNextEnvironment(responseObserver);
+              } else {
+                state = State.GET;
+              }
+              break;
+            }
+
+          case GET:
+            RunnerApi.ArtifactInformation currentArtifact = pendingGets.remove();
+            String name = createFilename(nameIndex++, currentEnvironment, currentArtifact);
+            try {
+              LOG.debug("Storing artifacts for {} as {}", stagingToken, name);
+              currentOutput = new ArrayBlockingQueue<ByteString>(100);
+              stagedFutures
+                  .get(currentEnvironment)
+                  .add(
+                      stagingExecutor.submit(
+                          new StoreArtifact(
+                              stagingToken,
+                              name,
+                              currentArtifact,
+                              currentOutput,
+                              totalPendingBytes)));
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              responseObserver.onError(exn);
+            }
+            state = State.GETCHUNK;
+            // fall through
+
+          case GETCHUNK:
+            try {
+              ByteString chunk = responseWrapper.getGetArtifactResponse().getData();
+              if (chunk.size() > 0) {
+                totalPendingBytes.aquire(chunk.size());
+                currentOutput.put(chunk);
+              }
+              if (responseWrapper.getIsLast()) {
+                currentOutput.put(ByteString.EMPTY); // The EOF value.
+                if (pendingGets.isEmpty()) {
+                  resolveNextEnvironment(responseObserver);
+                } else {
+                  state = State.GET;
+                  LOG.debug("Waiting for {}", pendingGets.peek());
+                }
+              }
+            } catch (Exception exn) {
+              LOG.error("Error submitting.", exn);
+              onError(exn);
+            }
+            break;
+
+          default:
+            responseObserver.onError(
+                new StatusException(
+                    Status.INVALID_ARGUMENT.withDescription("Illegal state " + state)));
+        }
+      }
+
+      private void resolveNextEnvironment(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        if (pendingResolves.isEmpty()) {
+          finishStaging(responseObserver);
+        } else {
+          state = State.RESOLVE;
+          LOG.info("Resolving artifacts for {}.{}.", stagingToken, pendingResolves.peek());
+          responseObserver.onNext(
+              ArtifactApi.ArtifactRequestWrapper.newBuilder()
+                  .setResolveArtifact(
+                      ArtifactApi.ResolveArtifactsRequest.newBuilder()
+                          .addAllArtifacts(toResolve.get(pendingResolves.peek())))
+                  .build());
+        }
+      }
+
+      private void finishStaging(
+          StreamObserver<ArtifactApi.ArtifactRequestWrapper> responseObserver) {
+        LOG.debug("Finishing staging for {}.", stagingToken);
+        Map<String, List<RunnerApi.ArtifactInformation>> staged = new HashMap<>();
+        try {
+          for (Map.Entry<String, List<Future<RunnerApi.ArtifactInformation>>> entry :
+              stagedFutures.entrySet()) {
+            List<RunnerApi.ArtifactInformation> envStaged = new ArrayList<>();
+            for (Future<RunnerApi.ArtifactInformation> future : entry.getValue()) {
+              envStaged.add(future.get());
+            }
+            staged.put(entry.getKey(), envStaged);
+          }
+          ArtifactStagingService.this.staged.put(stagingToken, staged);
+          stagingExecutor.shutdown();
+          state = State.DONE;
+          LOG.info("Artifacts fully staged for {}.", stagingToken);
+          responseObserver.onCompleted();
+        } catch (Exception exn) {
+          LOG.error("Error staging artifacts", exn);
+          responseObserver.onError(exn);
+          state = State.ERROR;
+          return;
+        }
+      }
+
+      /**
+       * Return an alternative artifact if we do not need to get this over the artifact API, or
+       * possibly at all.
+       */
+      private Optional<RunnerApi.ArtifactInformation> getLocal(
+          RunnerApi.ArtifactInformation artifact) {
+        return Optional.empty();
+      }
+
+      /**
+       * Attempts to provide a reasonable filename for the artifact.
+       *
+       * @param index a monotonically increasing index, which provides uniqueness
+       * @param environment the environment id
+       * @param artifact the artifact itself
+       */
+      private String createFilename(
+          int index, String environment, RunnerApi.ArtifactInformation artifact) {
+        String path;
+        try {
+          if (artifact.getRoleUrn().equals(ArtifactRetrievalService.STAGING_TO_ARTIFACT_URN)) {
+            path =
+                RunnerApi.ArtifactStagingToRolePayload.parseFrom(artifact.getRolePayload())
+                    .getStagedName();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.FILE_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload()).getPath();
+          } else if (artifact.getTypeUrn().equals(ArtifactRetrievalService.URL_ARTIFACT_URN)) {
+            path = RunnerApi.ArtifactUrlPayload.parseFrom(artifact.getTypePayload()).getUrl();
+          } else {
+            path = "artifact";
+          }
+        } catch (InvalidProtocolBufferException exn) {
+          throw new RuntimeException(exn);
+        }
+        // Limit to the last contiguous alpha-numeric sequence. In particular, this will exclude
+        // all path separators.
+        List<String> components = Splitter.onPattern("[^A-Za-z-_.]]").splitToList(path);
 
 Review comment:
   This won't support Chinese/Russian/Japanese/.. users since they have alphabets that typically don't contain these characters.
   
   It might make more sense to use the full path string escaped instead of trying to figure out the base name by replacing the File.separator with something else like _
   See:
   https://docs.oracle.com/javase/8/docs/api/java/io/File.html#separator

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services