You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/25 12:00:40 UTC

[GitHub] [beam] ramazan-yapparov commented on a change in pull request #13914: [BEAM-7637] Migration s3 on sdkv2

ramazan-yapparov commented on a change in pull request #13914:
URL: https://github.com/apache/beam/pull/13914#discussion_r582774754



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.s3;
+
+import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws2.options.S3ClientBuilderFactory;
+import org.apache.beam.sdk.io.aws2.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkServiceException;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.CopyPartResult;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
+
+/** {@link FileSystem} implementation for Amazon S3. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+class S3FileSystem extends FileSystem<S3ResourceId> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API: You can create a copy of your object up to 5 GB in a single atomic operation
+  // Ref. https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectsExamples.html
+  private static final long MAX_COPY_OBJECT_SIZE_BYTES = 5_368_709_120L;
+
+  // S3 API, delete-objects: "You may specify up to 1000 keys."
+  private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  // Non-final for testing.
+  private Supplier<S3Client> s3Client;
+  private final S3Options options;
+  private final ListeningExecutorService executorService;
+
+  S3FileSystem(S3Options options) {
+    this.options = checkNotNull(options, "options");
+    S3ClientBuilder builder =
+        InstanceBuilder.ofType(S3ClientBuilderFactory.class)
+            .fromClass(options.getS3ClientFactoryClass())
+            .build()
+            .createBuilder(options);
+    // The Supplier is to make sure we don't call .build() unless we are actually using S3.
+    s3Client = Suppliers.memoize(builder::build);
+
+    checkNotNull(options.getS3StorageClass(), "storageClass");
+    checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize");
+    executorService =
+        MoreExecutors.listeningDecorator(
+            Executors.newFixedThreadPool(
+                options.getS3ThreadPoolSize(), new ThreadFactoryBuilder().setDaemon(true).build()));
+  }
+
+  @Override
+  protected String getScheme() {
+    return S3ResourceId.SCHEME;
+  }
+
+  @VisibleForTesting
+  void setS3Client(S3Client s3) {
+    this.s3Client = Suppliers.ofInstance(s3);
+  }
+
+  @VisibleForTesting
+  S3Client getS3Client() {
+    return this.s3Client.get();
+  }
+
+  @Override
+  protected List<MatchResult> match(List<String> specs) throws IOException {
+    List<S3ResourceId> paths =
+        specs.stream().map(S3ResourceId::fromUri).collect(Collectors.toList());
+    List<S3ResourceId> globs = new ArrayList<>();
+    List<S3ResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (S3ResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(
+            globMatches.hasNext(),
+            "Internal error encountered in S3Filesystem: expected more elements in globMatches.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(
+            nonGlobMatches.hasNext(),
+            "Internal error encountered in S3Filesystem: expected more elements in nonGlobMatches.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(
+        !globMatches.hasNext(),
+        "Internal error encountered in S3Filesystem: expected no more elements in globMatches.");
+    checkState(
+        !nonGlobMatches.hasNext(),
+        "Internal error encountered in S3Filesystem: expected no more elements in nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /** Gets {@link MatchResult} representing all objects that match wildcard-containing paths. */
+  @VisibleForTesting
+  List<MatchResult> matchGlobPaths(Collection<S3ResourceId> globPaths) throws IOException {

Review comment:
       Maybe rewrite this method using Stream API instead of for loops?
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org