You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/29 21:06:07 UTC
[iceberg] 05/09: AWS: Add check to create staging directory if not
exists for S3OutputStream (#3175)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.12.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit c739204c40dc5f7748b5f823c7f39892bc254239
Author: Rajarshi Sarkar <ra...@bitmesra.ac.in>
AuthorDate: Wed Sep 29 01:16:24 2021 +0530
AWS: Add check to create staging directory if not exists for S3OutputStream (#3175)
---
.../main/java/org/apache/iceberg/io/OutputFile.java | 1 +
.../org/apache/iceberg/aws/s3/S3OutputStream.java | 21 +++++++++++++++++++++
.../apache/iceberg/aws/s3/S3OutputStreamTest.java | 19 +++++++++++++++++++
3 files changed, 41 insertions(+)
diff --git a/api/src/main/java/org/apache/iceberg/io/OutputFile.java b/api/src/main/java/org/apache/iceberg/io/OutputFile.java
index 34b4e54..67195c4 100644
--- a/api/src/main/java/org/apache/iceberg/io/OutputFile.java
+++ b/api/src/main/java/org/apache/iceberg/io/OutputFile.java
@@ -48,6 +48,7 @@ public interface OutputFile {
*
* @return an output stream that can report its position
* @throws RuntimeIOException If the implementation throws an {@link IOException}
+ * @throws SecurityException If staging directory creation fails due to missing JVM level permission
*/
PositionOutputStream createOrOverwrite();
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 690dc9a..f66a296 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -175,6 +175,7 @@ class S3OutputStream extends PositionOutputStream {
stream.close();
}
+ createStagingDirectoryIfNotExists();
currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
currentStagingFile.deleteOnExit();
stagingFiles.add(currentStagingFile);
@@ -328,6 +329,26 @@ class S3OutputStream extends PositionOutputStream {
}
}
+ private void createStagingDirectoryIfNotExists() throws IOException, SecurityException {
+ if (!stagingDirectory.exists()) {
+ LOG.info("Staging directory does not exist, trying to create one: {}",
+ stagingDirectory.getAbsolutePath());
+ boolean createdStagingDirectory = stagingDirectory.mkdirs();
+ if (createdStagingDirectory) {
+ LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath());
+ } else {
+ if (stagingDirectory.exists()) {
+ LOG.info("Successfully created staging directory by another process: {}",
+ stagingDirectory.getAbsolutePath());
+ } else {
+ throw new IOException(
+ "Failed to create staging directory due to some unknown reason: " + stagingDirectory
+ .getAbsolutePath());
+ }
+ }
+ }
+ }
+
@SuppressWarnings("checkstyle:NoFinalizer")
@Override
protected void finalize() throws Throwable {
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
index b4dc1ec..b0f8b73 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.aws.s3;
import com.adobe.testing.s3mock.junit4.S3MockRule;
+import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
@@ -29,6 +30,7 @@ import java.util.UUID;
import java.util.stream.Stream;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -72,6 +74,7 @@ public class S3OutputStreamTest {
private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3));
private final Random random = new Random(1);
private final Path tmpDir = Files.createTempDirectory("s3fileio-test-");
+ private final String newTmpDirectory = "/tmp/newStagingDirectory";
private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
@@ -85,6 +88,14 @@ public class S3OutputStreamTest {
s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}
+ @After
+ public void after() {
+ File newStagingDirectory = new File(newTmpDirectory);
+ if (newStagingDirectory.exists()) {
+ newStagingDirectory.delete();
+ }
+ }
+
@Test
public void testWrite() {
// Run tests for both byte and array write paths
@@ -140,6 +151,14 @@ public class S3OutputStreamTest {
stream.close();
}
+ @Test
+ public void testStagingDirectoryCreation() throws IOException {
+ AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of(
+ AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory));
+ S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties);
+ stream.close();
+ }
+
private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) {
try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) {
if (arrayWrite) {