You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2021/09/28 19:46:32 UTC

[iceberg] branch master updated: AWS: Add check to create staging directory if not exists for S3OutputStream (#3175)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fde388  AWS: Add check to create staging directory if not exists for S3OutputStream (#3175)
1fde388 is described below

commit 1fde388bafbfdf79689687217c902f9bccb4f4af
Author: Rajarshi Sarkar <ra...@bitmesra.ac.in>
AuthorDate: Wed Sep 29 01:16:24 2021 +0530

    AWS: Add check to create staging directory if not exists for S3OutputStream (#3175)
---
 .../main/java/org/apache/iceberg/io/OutputFile.java |  1 +
 .../org/apache/iceberg/aws/s3/S3OutputStream.java   | 21 +++++++++++++++++++++
 .../apache/iceberg/aws/s3/S3OutputStreamTest.java   | 19 +++++++++++++++++++
 3 files changed, 41 insertions(+)

diff --git a/api/src/main/java/org/apache/iceberg/io/OutputFile.java b/api/src/main/java/org/apache/iceberg/io/OutputFile.java
index 34b4e54..67195c4 100644
--- a/api/src/main/java/org/apache/iceberg/io/OutputFile.java
+++ b/api/src/main/java/org/apache/iceberg/io/OutputFile.java
@@ -48,6 +48,7 @@ public interface OutputFile {
    *
    * @return an output stream that can report its position
    * @throws RuntimeIOException If the implementation throws an {@link IOException}
+   * @throws SecurityException If staging directory creation fails due to missing JVM level permission
    */
   PositionOutputStream createOrOverwrite();
 
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 690dc9a..f66a296 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -175,6 +175,7 @@ class S3OutputStream extends PositionOutputStream {
       stream.close();
     }
 
+    createStagingDirectoryIfNotExists();
     currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
     currentStagingFile.deleteOnExit();
     stagingFiles.add(currentStagingFile);
@@ -328,6 +329,26 @@ class S3OutputStream extends PositionOutputStream {
     }
   }
 
+  private void createStagingDirectoryIfNotExists() throws IOException, SecurityException {
+    if (!stagingDirectory.exists()) {
+      LOG.info("Staging directory does not exist, trying to create one: {}",
+          stagingDirectory.getAbsolutePath());
+      boolean createdStagingDirectory = stagingDirectory.mkdirs();
+      if (createdStagingDirectory) {
+        LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath());
+      } else {
+        if (stagingDirectory.exists()) {
+          LOG.info("Successfully created staging directory by another process: {}",
+              stagingDirectory.getAbsolutePath());
+        } else {
+          throw new IOException(
+              "Failed to create staging directory due to some unknown reason: " + stagingDirectory
+                  .getAbsolutePath());
+        }
+      }
+    }
+  }
+
   @SuppressWarnings("checkstyle:NoFinalizer")
   @Override
   protected void finalize() throws Throwable {
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
index b4dc1ec..b0f8b73 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.aws.s3;
 
 import com.adobe.testing.s3mock.junit4.S3MockRule;
+import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.file.Files;
@@ -29,6 +30,7 @@ import java.util.UUID;
 import java.util.stream.Stream;
 import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -72,6 +74,7 @@ public class S3OutputStreamTest {
   private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3));
   private final Random random = new Random(1);
   private final Path tmpDir = Files.createTempDirectory("s3fileio-test-");
+  private final String newTmpDirectory = "/tmp/newStagingDirectory";
 
   private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
       AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
@@ -85,6 +88,14 @@ public class S3OutputStreamTest {
     s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
   }
 
+  @After
+  public void after() {
+    File newStagingDirectory = new File(newTmpDirectory);
+    if (newStagingDirectory.exists()) {
+      newStagingDirectory.delete();
+    }
+  }
+
   @Test
   public void testWrite() {
     // Run tests for both byte and array write paths
@@ -140,6 +151,14 @@ public class S3OutputStreamTest {
     stream.close();
   }
 
+  @Test
+  public void testStagingDirectoryCreation() throws IOException {
+    AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of(
+        AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory));
+    S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties);
+    stream.close();
+  }
+
   private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) {
     try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) {
       if (arrayWrite) {